activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1345202 [1/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/store/ main/java/org/apache/activ...
Date Fri, 01 Jun 2012 14:32:51 GMT
Author: gtully
Date: Fri Jun  1 14:32:50 2012
New Revision: 1345202

URL: http://svn.apache.org/viewvc?rev=1345202&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3872 - Implement "exactly once" delivery with JDBC and XA in the event of a failure post prepare. jdbc xa is not a runner due to single table and update locks. implemention adds xid column to messages and acks table. a non null value indicated a prepared add/ack, so the insert is done on prepare. the result of the transaction outcome requires a row level update

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleBlobJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Fri Jun  1 14:32:50 2012
@@ -25,6 +25,7 @@ import org.apache.activemq.command.Conne
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
 import org.apache.activemq.security.SecurityContext;
@@ -60,6 +61,7 @@ public class ConnectionContext {
     private boolean dontSendReponse;
     private boolean clientMaster = true;
     private ConnectionState connectionState;
+    private XATransactionId xid;
 
     public ConnectionContext() {
     	this.messageEvaluationContext = new MessageEvaluationContext();
@@ -329,4 +331,12 @@ public class ConnectionContext {
     public ConnectionState getConnectionState() {
         return this.connectionState;
     }
+
+    public void setXid(XATransactionId id) {
+        this.xid = id;
+    }
+
+    public XATransactionId getXid() {
+        return xid;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri Jun  1 14:32:50 2012
@@ -137,42 +137,40 @@ public class TransactionBroker extends B
     }
 
     private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
-        if (destination instanceof Queue) {
-            Synchronization sync = new PreparedDestinationCompletion((Queue) destination, command.isMessage());
-            // ensure one per destination in the list
-            transaction.removeSynchronization(sync);
-            transaction.addSynchronization(sync);
-        }
+        Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
+        // ensure one per destination in the list
+        transaction.removeSynchronization(sync);
+        transaction.addSynchronization(sync);
     }
 
     static class PreparedDestinationCompletion extends Synchronization {
-        final Queue queue;
+        final Destination destination;
         final boolean messageSend;
-        public PreparedDestinationCompletion(final Queue queue, boolean messageSend) {
-            this.queue = queue;
+        public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
+            this.destination = destination;
             // rollback relevant to acks, commit to sends
             this.messageSend = messageSend;
         }
 
         @Override
         public int hashCode() {
-            return System.identityHashCode(queue) +
+            return System.identityHashCode(destination) +
                     System.identityHashCode(Boolean.valueOf(messageSend));
         }
 
         @Override
         public boolean equals(Object other) {
             return other instanceof PreparedDestinationCompletion &&
-                    queue.equals(((PreparedDestinationCompletion) other).queue) &&
+                    destination.equals(((PreparedDestinationCompletion) other).destination) &&
                     messageSend == ((PreparedDestinationCompletion) other).messageSend;
         }
 
         @Override
         public void afterRollback() throws Exception {
             if (!messageSend) {
-                queue.clearPendingMessages();
+                destination.clearPendingMessages();
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("cleared pending from afterRollback : " + queue);
+                    LOG.debug("cleared pending from afterRollback : " + destination);
                 }
             }
         }
@@ -180,9 +178,9 @@ public class TransactionBroker extends B
         @Override
         public void afterCommit() throws Exception {
             if (messageSend) {
-                queue.clearPendingMessages();
+                destination.clearPendingMessages();
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("cleared pending from afterCommit : " + queue);
+                    LOG.debug("cleared pending from afterCommit : " + destination);
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Fri Jun  1 14:32:50 2012
@@ -233,4 +233,6 @@ public interface Destination extends Ser
 
     boolean isDoOptimzeMessageStorage();
     void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
+
+    public void clearPendingMessages();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Fri Jun  1 14:32:50 2012
@@ -310,4 +310,9 @@ public class DestinationFilter implement
         next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
     }
 
+    @Override
+    public void clearPendingMessages() {
+        next.clearPendingMessages();
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jun  1 14:32:50 2012
@@ -18,8 +18,10 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -745,4 +747,47 @@ public class Topic extends BaseDestinati
         }
         return result;
     }
+
+    /**
+     * force a reread of the store - after transaction recovery completion
+     */
+    public void clearPendingMessages() {
+        dispatchLock.readLock().lock();
+        try {
+            for (DurableTopicSubscription durableTopicSubscription : durableSubcribers.values()) {
+                clearPendingAndDispatch(durableTopicSubscription);
+            }
+        } finally {
+            dispatchLock.readLock().unlock();
+        }
+    }
+
+
+    public void clearPendingMessages(SubscriptionKey subscriptionKey) {
+        dispatchLock.readLock().lock();
+        try {
+            DurableTopicSubscription durableTopicSubscription = durableSubcribers.get(subscriptionKey);
+            clearPendingAndDispatch(durableTopicSubscription);
+        } finally {
+            dispatchLock.readLock().unlock();
+        }
+    }
+
+    private void clearPendingAndDispatch(DurableTopicSubscription durableTopicSubscription) {
+        synchronized (durableTopicSubscription.pendingLock) {
+            durableTopicSubscription.pending.clear();
+            try {
+                durableTopicSubscription.dispatchPending();
+            } catch (IOException exception) {
+                LOG.warn("After clear of pending, failed to dispatch to: " +
+                        durableTopicSubscription + ", for :" + destination + ", pending: " +
+                        durableTopicSubscription.pending, exception);
+            }
+        }
+    }
+
+    public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
+        return durableSubcribers;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageId.java Fri Jun  1 14:32:50 2012
@@ -152,7 +152,7 @@ public class MessageId implements DataSt
         MessageId copy = new MessageId(producerId, producerSequenceId);
         copy.key = key;
         copy.brokerSequenceId = brokerSequenceId;
-        copy.dataLocator = dataLocator;
+        copy.dataLocator = new AtomicReference<Object>(dataLocator.get());
         return copy;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/XATransactionId.java Fri Jun  1 14:32:50 2012
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.command;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import javax.transaction.xa.Xid;
-import org.apache.activemq.util.HexSupport;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
 
 /**
  * @openwire:marshaller code="112"
@@ -32,6 +34,8 @@ public class XATransactionId extends Tra
     private int formatId;
     private byte[] branchQualifier;
     private byte[] globalTransactionId;
+    private transient DataByteArrayOutputStream outputStream;
+    private transient byte[] encodedXidBytes;
 
     private transient int hash;
     private transient String transactionKey;
@@ -46,14 +50,58 @@ public class XATransactionId extends Tra
         this.branchQualifier = xid.getBranchQualifier();
     }
 
+    public XATransactionId(byte[] encodedBytes) {
+        encodedXidBytes = encodedBytes;
+        initFromEncodedBytes();
+    }
+
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
+    final int XID_PREFIX_SIZE = 16;
+    //+|-,(long)lastAck,(byte)priority,(int)formatid,(short)globalLength....
+    private void initFromEncodedBytes() {
+        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXidBytes);
+        inputStream.skipBytes(10);
+        formatId = inputStream.readInt();
+        int globalLength = inputStream.readShort();
+        globalTransactionId = new byte[globalLength];
+        try {
+            inputStream.read(globalTransactionId);
+            branchQualifier = new byte[inputStream.available()];
+            inputStream.read(branchQualifier);
+        } catch (IOException fatal) {
+            throw new RuntimeException(this + ", failed to decode:", fatal);
+        }
+    }
+
+    public synchronized byte[] getEncodedXidBytes() {
+        if (encodedXidBytes == null) {
+            outputStream = new DataByteArrayOutputStream(XID_PREFIX_SIZE + globalTransactionId.length + branchQualifier.length);
+            outputStream.position(10);
+            outputStream.writeInt(formatId);
+            // global length
+            outputStream.writeShort(globalTransactionId.length);
+            try {
+                outputStream.write(globalTransactionId);
+                outputStream.write(branchQualifier);
+            } catch (IOException fatal) {
+                throw new RuntimeException(this + ", failed to encode:", fatal);
+            }
+            encodedXidBytes = outputStream.getData();
+        }
+        return encodedXidBytes;
+    }
+
+    public DataByteArrayOutputStream getOutputStream() {
+        return outputStream;
+    }
+
     public synchronized String getTransactionKey() {
         if (transactionKey == null) {
             StringBuffer s = new StringBuffer();
-            s.append("XID:[globalId=");
+            s.append("XID:[" + formatId + ",globalId=");
             for (int i = 0; i < globalTransactionId.length; i++) {
                 s.append(Integer.toHexString(globalTransactionId[i]));
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Fri Jun  1 14:32:50 2012
@@ -79,7 +79,7 @@ public interface TopicMessageStore exten
     void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception;
 
     /**
-     * A hint to the Store to reset any batching state for a durable subsriber
+     * A hint to the Store to reset any batching state for a durable subscriber
      * 
      * @param clientId
      * @param subscriptionName

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Fri Jun  1 14:32:50 2012
@@ -20,9 +20,11 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Set;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.XATransactionId;
 
 /**
  * 
@@ -35,7 +37,7 @@ public interface JDBCAdapter {
 
     void doDropTables(TransactionContext c) throws SQLException, IOException;
 
-    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority) throws SQLException, IOException;
+    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration, byte priority, XATransactionId xid) throws SQLException, IOException;
 
     void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException;
 
@@ -45,11 +47,11 @@ public interface JDBCAdapter {
 
     String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
 
-    void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
+    void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException;
 
     void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception;
 
-    void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
+    void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, String subscriptionName, long seq, long prio) throws SQLException, IOException;
 
     void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener)
         throws Exception;
@@ -92,11 +94,17 @@ public interface JDBCAdapter {
 
     long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
 
-    void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
+    void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
 
     public int getMaxRows();
 
     public void setMaxRows(int maxRows);
 
     void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
+
+    void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException;
+
+    void doCommitAddOp(TransactionContext c, long storeSequenceIdForMessageId) throws SQLException, IOException;
+
+    void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String subId, String subName) throws SQLException, IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Jun  1 14:32:50 2012
@@ -120,15 +120,19 @@ public class JDBCMessageStore extends Ab
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {      
-            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
-                    this.isPrioritizedMessages() ? message.getPriority() : 0);
+            adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(),
+                    this.isPrioritizedMessages() ? message.getPriority() : 0, context != null ? context.getXid() : null);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }
-        onAdd(messageId, sequenceId, message.getPriority());
+        if (context != null && context.getXid() != null) {
+            message.getMessageId().setDataLocator(sequenceId);
+        } else {
+            onAdd(messageId, sequenceId, message.getPriority());
+        }
     }
 
     protected void onAdd(MessageId messageId, long sequenceId, byte priority) {
@@ -186,19 +190,22 @@ public class JDBCMessageStore extends Ab
     }
 
     public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
-    	
-    	long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
+
+    	long seq = persistenceAdapter.getStoreSequenceIdForMessageId(ack.getLastMessageId(), destination)[0];
 
         // Get a connection and remove the message from the DB
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-            adapter.doRemoveMessage(c, seq);
+            adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
         } finally {
             c.close();
         }
+        if (context != null && context.getXid() != null) {
+            ack.getLastMessageId().setDataLocator(seq);
+        }
     }
 
     public void recover(final MessageRecoveryListener listener) throws Exception {
@@ -315,7 +322,7 @@ public class JDBCMessageStore extends Ab
     @Override
     public void setBatch(MessageId messageId) {
         try {
-            long[] storedValues = getStoreSequenceIdForMessageId(messageId);
+            long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(messageId, destination);
             lastRecoveredSequenceId.set(storedValues[0]);
             lastRecoveredPriority.set(storedValues[1]);
         } catch (IOException ignoredAsAlreadyLogged) {
@@ -328,20 +335,7 @@ public class JDBCMessageStore extends Ab
         }
     }
 
-    private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws IOException {
-        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
-        TransactionContext c = persistenceAdapter.getTransactionContext();
-        try {
-            result = adapter.getStoreSequenceId(c, destination, messageId);
-        } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
-            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
-        } finally {
-            c.close();
-        }
-        return result;
-    }
-    
+
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
     }   

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Fri Jun  1 14:32:50 2012
@@ -37,6 +37,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.openwire.OpenWireFormat;
@@ -237,7 +238,7 @@ public class JDBCPersistenceAdapter exte
 
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
-            transactionStore = new MemoryTransactionStore(this);
+            transactionStore = new JdbcMemoryTransactionStore(this);
         }
         return this.transactionStore;
     }
@@ -768,4 +769,95 @@ public class JDBCPersistenceAdapter exte
     public void setMaxRows(int maxRows) {
         this.maxRows = maxRows;
     }
+
+    public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
+        TransactionContext c = getTransactionContext();
+        try {
+            getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
+        } finally {
+            c.close();
+        }
+    }
+
+    public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
+        TransactionContext c = getTransactionContext(context);
+        try {
+            long sequence = (Long)messageId.getDataLocator();
+            getAdapter().doCommitAddOp(c, sequence);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+    }
+
+    public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
+        TransactionContext c = getTransactionContext(context);
+        try {
+            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getDataLocator(), null);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
+        } finally {
+            c.close();
+        }
+    }
+
+
+    public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
+        TransactionContext c = getTransactionContext(context);
+        try {
+            getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
+        } finally {
+            c.close();
+        }
+    }
+
+    public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
+        TransactionContext c = getTransactionContext(context);
+        try {
+            byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
+            getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
+        } finally {
+            c.close();
+        }
+    }
+
+    // after recovery there is no record of the original messageId for the ack
+    public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
+        TransactionContext c = getTransactionContext(context);
+        try {
+            getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+    }
+
+    long[] getStoreSequenceIdForMessageId(MessageId messageId, ActiveMQDestination destination) throws IOException {
+        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
+        TransactionContext c = getTransactionContext();
+        try {
+            result = adapter.getStoreSequenceId(c, destination, messageId);
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
+        } finally {
+            c.close();
+        }
+        return result;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Jun  1 14:32:50 2012
@@ -19,9 +19,11 @@ package org.apache.activemq.store.jdbc;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -48,6 +50,7 @@ public class JDBCTopicMessageStore exten
 
     private static final Logger LOG = LoggerFactory.getLogger(JDBCTopicMessageStore.class);
     private Map<String, LastRecovered> subscriberLastRecoveredMap = new ConcurrentHashMap<String, LastRecovered>();
+    private Set<String> pendingCompletion = new HashSet<String>();
 
     public static final String PROPERTY_SEQUENCE_ID_CACHE_SIZE = "org.apache.activemq.store.jdbc.SEQUENCE_ID_CACHE_SIZE";
     private static final int SEQUENCE_ID_CACHE_SIZE = Integer.parseInt(System.getProperty(
@@ -75,9 +78,9 @@ public class JDBCTopicMessageStore exten
         try {
             long[] res = getCachedStoreSequenceId(c, destination, messageId);
             if (this.isPrioritizedMessages()) {
-                adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName, res[0], res[1]);
+                adapter.doSetLastAckWithPriority(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
             } else {
-                adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
+                adapter.doSetLastAck(c, destination, context != null ? context.getXid() : null, clientId, subscriptionName, res[0], res[1]);
             }
             if (LOG.isTraceEnabled()) {
                 LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ", priority: " + res[1] + " mid:" + messageId);
@@ -90,7 +93,7 @@ public class JDBCTopicMessageStore exten
         }
     }
 
-    private long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
+    public long[] getCachedStoreSequenceId(TransactionContext transactionContext, ActiveMQDestination destination, MessageId messageId) throws SQLException, IOException {
         long[] val = null;
         sequenceIdCacheSizeLock.readLock().lock();
         try {
@@ -254,7 +257,7 @@ public class JDBCTopicMessageStore exten
         LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
         try {
             if (LOG.isTraceEnabled()) {
-                LOG.trace(key + " existing last recovered: " + lastRecovered);
+                LOG.trace(this + ", " + key + " existing last recovered: " + lastRecovered);
             }
             if (isPrioritizedMessages()) {
                 Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
@@ -291,7 +294,26 @@ public class JDBCTopicMessageStore exten
     }
 
     public void resetBatching(String clientId, String subscriptionName) {
-        subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        if (!pendingCompletion.contains(key))  {
+            subscriberLastRecoveredMap.remove(key);
+        } else {
+            LOG.trace(this +  ", skip resetBatch during pending completion for: " + key);
+        }
+    }
+
+    public void pendingCompletion(String clientId, String subscriptionName, long sequenceId, byte priority) {
+        final String key = getSubscriptionKey(clientId, subscriptionName);
+        LastRecovered recovered = new LastRecovered();
+        recovered.perPriority[isPrioritizedMessages() ? priority : javax.jms.Message.DEFAULT_PRIORITY].recovered = sequenceId;
+        subscriberLastRecoveredMap.put(key, recovered);
+        pendingCompletion.add(key);
+        LOG.trace(this + ", pending completion: " + key + ", last: " + recovered);
+    }
+
+    public void complete(String clientId, String subscriptionName) {
+        pendingCompletion.remove(getSubscriptionKey(clientId, subscriptionName));
+        LOG.trace(this + ", completion for: " + getSubscriptionKey(clientId, subscriptionName));
     }
 
     protected void onAdd(MessageId messageId, long sequenceId, byte priority) {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java?rev=1345202&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java Fri Jun  1 14:32:50 2012
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.SubscriptionKey;
+
+/**
+ * respect 2pc prepare
+ * uses local transactions to maintain prepared state
+ * xid column provides transaction flag for additions and removals
+ * a commit clears that context and completes the work
+ * a rollback clears the flag and removes the additions
+ * Essentially a prepare is an insert &| update transaction
+ *  commit|rollback is an update &| remove
+ */
+public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
+
+
+    private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
+
+    public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
+        super(jdbcPersistenceAdapter);
+    }
+
+    @Override
+    public void prepare(TransactionId txid) throws IOException {
+        Tx tx = inflightTransactions.remove(txid);
+        if (tx == null) {
+            return;
+        }
+
+        ConnectionContext ctx = new ConnectionContext();
+        // setting the xid modifies the add/remove to be pending transaction outcome
+        ctx.setXid((XATransactionId) txid);
+        persistenceAdapter.beginTransaction(ctx);
+        try {
+
+            // Do all the message adds.
+            for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
+                AddMessageCommand cmd = iter.next();
+                cmd.run(ctx);
+            }
+            // And removes..
+            for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
+                RemoveMessageCommand cmd = iter.next();
+                cmd.run(ctx);
+            }
+
+        } catch ( IOException e ) {
+            persistenceAdapter.rollbackTransaction(ctx);
+            throw e;
+        }
+        persistenceAdapter.commitTransaction(ctx);
+
+        ctx.setXid(null);
+        // setup for commit outcome
+        ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
+        for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
+            final AddMessageCommand addMessageCommand = iter.next();
+            updateFromPreparedStateCommands.add(new AddMessageCommand() {
+                @Override
+                public Message getMessage() {
+                    return addMessageCommand.getMessage();
+                }
+
+                @Override
+                public MessageStore getMessageStore() {
+                    return addMessageCommand.getMessageStore();
+                }
+
+                @Override
+                public void run(ConnectionContext context) throws IOException {
+                    JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
+                    Message message = addMessageCommand.getMessage();
+                    jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
+                    ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
+                            message.getMessageId(),
+                            (Long)message.getMessageId().getDataLocator(),
+                            message.getPriority());
+
+                }
+            });
+        }
+        tx.messages = updateFromPreparedStateCommands;
+        preparedTransactions.put(txid, tx);
+
+    }
+
+
+    @Override
+    public void rollback(TransactionId txid) throws IOException {
+
+        Tx tx = inflightTransactions.remove(txid);
+        if (tx == null) {
+            tx = preparedTransactions.remove(txid);
+            if (tx != null) {
+                // undo prepare work
+                ConnectionContext ctx = new ConnectionContext();
+                persistenceAdapter.beginTransaction(ctx);
+                try {
+
+                    for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
+                        final Message message = iter.next().getMessage();
+                        // need to delete the row
+                        ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
+                    }
+
+                    for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
+                        RemoveMessageCommand removeMessageCommand = iter.next();
+                        if (removeMessageCommand instanceof LastAckCommand ) {
+                            ((LastAckCommand)removeMessageCommand).rollback(ctx);
+                        } else {
+                            // need to unset the txid flag on the existing row
+                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
+                                    removeMessageCommand.getMessageAck().getLastMessageId());
+                        }
+                    }
+                } catch (IOException e) {
+                    persistenceAdapter.rollbackTransaction(ctx);
+                    throw e;
+                }
+                persistenceAdapter.commitTransaction(ctx);
+            }
+        }
+    }
+
+    @Override
+    public void recover(TransactionRecoveryListener listener) throws IOException {
+        ((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
+        super.recover(listener);
+    }
+
+    public void recoverAdd(long id, byte[] messageBytes) throws IOException {
+        final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
+        message.getMessageId().setDataLocator(id);
+        Tx tx = getPreparedTx(message.getTransactionId());
+        tx.add(new AddMessageCommand() {
+            @Override
+            public Message getMessage() {
+                return message;
+            }
+
+            @Override
+            public MessageStore getMessageStore() {
+                return null;
+            }
+
+            @Override
+            public void run(ConnectionContext context) throws IOException {
+                ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
+            }
+
+        });
+    }
+
+    public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
+        Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
+        msg.getMessageId().setDataLocator(id);
+        Tx tx = getPreparedTx(new XATransactionId(xid));
+        final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
+        tx.add(new RemoveMessageCommand() {
+            @Override
+            public MessageAck getMessageAck() {
+                return ack;
+            }
+
+            @Override
+            public void run(ConnectionContext context) throws IOException {
+                ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
+            }
+
+            @Override
+            public MessageStore getMessageStore() {
+                return null;
+            }
+
+        });
+
+    }
+
+    interface LastAckCommand extends RemoveMessageCommand {
+        void rollback(ConnectionContext context) throws IOException;
+
+        String getClientId();
+
+        String getSubName();
+
+        long getSequence();
+
+        byte getPriority();
+
+        void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
+    }
+
+    public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
+        Tx tx = getPreparedTx(new XATransactionId(encodedXid));
+        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
+        inputStream.skipBytes(1); // +|-
+        final long lastAck = inputStream.readLong();
+        final byte priority = inputStream.readByte();
+        final MessageAck ack = new MessageAck();
+        ack.setDestination(destination);
+        tx.add(new LastAckCommand() {
+            JDBCTopicMessageStore jdbcTopicMessageStore;
+
+            @Override
+            public MessageAck getMessageAck() {
+                return ack;
+            }
+
+            @Override
+            public MessageStore getMessageStore() {
+                return jdbcTopicMessageStore;
+            }
+
+            @Override
+            public void run(ConnectionContext context) throws IOException {
+                ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
+                jdbcTopicMessageStore.complete(clientId, subName);
+            }
+
+            @Override
+            public void rollback(ConnectionContext context) throws IOException {
+                ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
+                jdbcTopicMessageStore.complete(clientId, subName);
+            }
+
+            @Override
+            public String getClientId() {
+                return clientId;
+            }
+
+            @Override
+            public String getSubName() {
+                return subName;
+            }
+
+            @Override
+            public long getSequence() {
+                return lastAck;
+            }
+
+            @Override
+            public byte getPriority() {
+                return priority;
+            }
+
+            @Override
+            public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
+                this.jdbcTopicMessageStore = jdbcTopicMessageStore;
+            }
+        });
+
+    }
+
+    @Override
+    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
+        topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
+    }
+
+    @Override
+    protected void onRecovered(Tx tx) {
+        for (RemoveMessageCommand removeMessageCommand: tx.acks) {
+            if (removeMessageCommand instanceof LastAckCommand) {
+                LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
+                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
+                jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
+                lastAckCommand.setMessageStore(jdbcTopicMessageStore);
+            }
+        }
+    }
+
+    @Override
+    public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
+                           final MessageId messageId, final MessageAck ack) throws IOException {
+
+        if (ack.isInTransaction()) {
+            Tx tx = getTx(ack.getTransactionId());
+            tx.add(new LastAckCommand() {
+                public MessageAck getMessageAck() {
+                    return ack;
+                }
+
+                public void run(ConnectionContext ctx) throws IOException {
+                    topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
+                }
+
+                @Override
+                public MessageStore getMessageStore() {
+                    return topicMessageStore;
+                }
+
+                @Override
+                public void rollback(ConnectionContext context) throws IOException {
+                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
+                    ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
+                            jdbcTopicMessageStore,
+                            ack,
+                            subscriptionName, clientId);
+                    jdbcTopicMessageStore.complete(clientId, subscriptionName);
+
+                    Map<ActiveMQDestination, Destination> destinations = ((JDBCPersistenceAdapter) persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap();
+                    Topic topic = (Topic) destinations.get(topicMessageStore.getDestination());
+                    SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+                    topic.getDurableTopicSubs().get(key).getPending().rollback(ack.getLastMessageId());
+                    topic.clearPendingMessages(key);
+                }
+
+
+                @Override
+                public String getClientId() {
+                    return clientId;
+                }
+
+                @Override
+                public String getSubName() {
+                    return subscriptionName;
+                }
+
+                @Override
+                public long getSequence() {
+                    throw new IllegalStateException("Sequence id must be inferred from ack");
+                }
+
+                @Override
+                public byte getPriority() {
+                    throw new IllegalStateException("Priority must be inferred from ack or row");
+                }
+
+                @Override
+                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
+                    throw new IllegalStateException("message store already known!");
+                }
+            });
+        } else {
+            topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
+        }
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Fri Jun  1 14:32:50 2012
@@ -76,6 +76,15 @@ public class Statements {
     private String deleteOldMessagesStatementWithPriority;
     private String durableSubscriberMessageCountStatementWithPriority;
     private String dropAckPKAlterStatementEnd;
+    private String updateXidFlagStatement;
+    private String findOpsPendingOutcomeStatement;
+    private String clearXidFlagStatement;
+    private String updateDurableLastAckInTxStatement;
+    private String findAcksPendingOutcomeStatement;
+    private String clearDurableLastAckInTxStatement;
+    private String updateDurableLastAckWithPriorityStatement;
+    private String updateDurableLastAckWithPriorityInTxStatement;
+    private String findXidByIdStatement;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -99,7 +108,9 @@ public class Statements {
                 "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 
                 "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
                 "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
+                "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
                 "ALTER TABLE " + getFullAckTableName() + " ADD PRIORITY " + sequenceDataType  + " DEFAULT 5 NOT NULL",
+                "ALTER TABLE " + getFullAckTableName() + " ADD XID " + binaryDataType,
                 "ALTER TABLE " + getFullAckTableName() + " " + getDropAckPKAlterStatementEnd(),
                 "ALTER TABLE " + getFullAckTableName() + " ADD PRIMARY KEY (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)",
             };
@@ -131,7 +142,7 @@ public class Statements {
         if (addMessageStatement == null) {
             addMessageStatement = "INSERT INTO "
                                   + getFullMessageTableName()
-                                  + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
+                                  + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
         }
         return addMessageStatement;
     }
@@ -171,7 +182,14 @@ public class Statements {
         }
         return findMessageByIdStatement;
     }
-    
+
+    public String getFindXidByIdStatement() {
+        if (findXidByIdStatement == null) {
+            findXidByIdStatement = "SELECT XID FROM " + getFullMessageTableName() + " WHERE ID=?";
+        }
+        return findXidByIdStatement;
+    }
+
     public String getFindAllMessagesStatement() {
         if (findAllMessagesStatement == null) {
             findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
@@ -271,6 +289,7 @@ public class Statements {
             findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
                                               + getFullAckTableName() + " D "
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                                              + " AND M.XID IS NULL"
                                               + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
                                               + " AND M.ID > ?"
                                               + " ORDER BY M.ID";
@@ -283,6 +302,7 @@ public class Statements {
             findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
                                               + " " + getFullAckTableName() + " D"
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                                              + " AND M.XID IS NULL"
                                               + " AND M.CONTAINER=D.CONTAINER"
                                               + " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
                                               + " AND M.ID > ? AND M.PRIORITY = ?"
@@ -414,7 +434,7 @@ public class Statements {
     public String getDestinationMessageCountStatement() {
         if (destinationMessageCountStatement == null) {
             destinationMessageCountStatement = "SELECT COUNT(*) FROM " + getFullMessageTableName()
-                                               + " WHERE CONTAINER=?";
+                                               + " WHERE CONTAINER=? AND XID IS NULL";
         }
         return destinationMessageCountStatement;
     }
@@ -425,7 +445,7 @@ public class Statements {
     public String getFindNextMessagesStatement() {
         if (findNextMessagesStatement == null) {
             findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
-                                        + " WHERE CONTAINER=? AND ID > ? ORDER BY ID";
+                                        + " WHERE CONTAINER=? AND ID > ? AND XID IS NULL ORDER BY ID";
         }
         return findNextMessagesStatement;
     }
@@ -437,6 +457,7 @@ public class Statements {
         if (findNextMessagesByPriorityStatement == null) {
             findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                         + " WHERE CONTAINER=?"
+                                        + " AND XID IS NULL"
                                         + " AND ((ID > ? AND PRIORITY = ?) OR PRIORITY < ?)"
                                         + " ORDER BY PRIORITY DESC, ID";
         }
@@ -478,11 +499,76 @@ public class Statements {
     public String getUpdateDurableLastAckStatement() {
         if (updateDurableLastAckStatement == null) {
             updateDurableLastAckStatement  = "UPDATE " + getFullAckTableName()
-                    + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+                    + " SET LAST_ACKED_ID=?, XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
         }
         return  updateDurableLastAckStatement;
     }
 
+    public String getUpdateDurableLastAckInTxStatement() {
+        if (updateDurableLastAckInTxStatement == null) {
+            updateDurableLastAckInTxStatement = "UPDATE " + getFullAckTableName()
+                    + " SET XID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
+        }
+        return updateDurableLastAckInTxStatement;
+    }
+
+    public String getUpdateDurableLastAckWithPriorityStatement() {
+        if (updateDurableLastAckWithPriorityStatement == null) {
+            updateDurableLastAckWithPriorityStatement  = "UPDATE " + getFullAckTableName()
+                    + " SET LAST_ACKED_ID=?, XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
+        }
+        return  updateDurableLastAckWithPriorityStatement;
+    }
+
+    public String getUpdateDurableLastAckWithPriorityInTxStatement() {
+        if (updateDurableLastAckWithPriorityInTxStatement == null) {
+            updateDurableLastAckWithPriorityInTxStatement  = "UPDATE " + getFullAckTableName()
+                    + " SET XID=? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
+        }
+        return  updateDurableLastAckWithPriorityInTxStatement;
+    }
+
+    public String getClearDurableLastAckInTxStatement() {
+        if (clearDurableLastAckInTxStatement == null) {
+            clearDurableLastAckInTxStatement = "UPDATE " + getFullAckTableName()
+                    + " SET XID = NULL WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=? AND PRIORITY=?";
+        }
+        return clearDurableLastAckInTxStatement;
+    }
+
+    public String getFindOpsPendingOutcomeStatement() {
+        if (findOpsPendingOutcomeStatement == null) {
+            findOpsPendingOutcomeStatement = "SELECT ID, XID, MSG FROM " + getFullMessageTableName()
+                    + " WHERE XID IS NOT NULL ORDER BY ID";
+        }
+        return findOpsPendingOutcomeStatement;
+    }
+
+    public String getFindAcksPendingOutcomeStatement() {
+        if (findAcksPendingOutcomeStatement == null) {
+            findAcksPendingOutcomeStatement = "SELECT XID," +
+                    " CONTAINER, CLIENT_ID, SUB_NAME FROM " + getFullAckTableName()
+                    + " WHERE XID IS NOT NULL";
+        }
+        return findAcksPendingOutcomeStatement;
+    }
+
+    public String getUpdateXidFlagStatement() {
+        if (updateXidFlagStatement == null) {
+            updateXidFlagStatement = "UPDATE " + getFullMessageTableName()
+                    + " SET XID = ? WHERE ID = ?";
+        }
+        return updateXidFlagStatement;
+    }
+
+    public String getClearXidFlagStatement() {
+        if (clearXidFlagStatement == null) {
+            clearXidFlagStatement = "UPDATE "  + getFullMessageTableName()
+                    + " SET XID = NULL WHERE ID = ?";
+        }
+        return clearXidFlagStatement;
+    }
+
     public String getFullMessageTableName() {
         return getTablePrefix() + getMessageTableName();
     }
@@ -788,5 +874,41 @@ public class Statements {
 
     public void setUpdateDurableLastAckStatement(String updateDurableLastAckStatement) {
         this.updateDurableLastAckStatement = updateDurableLastAckStatement;
-    }    
+    }
+
+    public void setUpdateXidFlagStatement(String updateXidFlagStatement) {
+        this.updateXidFlagStatement = updateXidFlagStatement;
+    }
+
+    public void setFindOpsPendingOutcomeStatement(String findOpsPendingOutcomeStatement) {
+        this.findOpsPendingOutcomeStatement = findOpsPendingOutcomeStatement;
+    }
+
+    public void setClearXidFlagStatement(String clearXidFlagStatement) {
+        this.clearXidFlagStatement = clearXidFlagStatement;
+    }
+
+    public void setUpdateDurableLastAckInTxStatement(String updateDurableLastAckInTxStatement) {
+        this.updateDurableLastAckInTxStatement = updateDurableLastAckInTxStatement;
+    }
+
+    public void setFindAcksPendingOutcomeStatement(String findAcksPendingOutcomeStatement) {
+        this.findAcksPendingOutcomeStatement = findAcksPendingOutcomeStatement;
+    }
+
+    public void setClearDurableLastAckInTxStatement(String clearDurableLastAckInTxStatement) {
+        this.clearDurableLastAckInTxStatement = clearDurableLastAckInTxStatement;
+    }
+
+    public void setUpdateDurableLastAckWithPriorityStatement(String updateDurableLastAckWithPriorityStatement) {
+        this.updateDurableLastAckWithPriorityStatement = updateDurableLastAckWithPriorityStatement;
+    }
+
+    public void setUpdateDurableLastAckWithPriorityInTxStatement(String updateDurableLastAckWithPriorityInTxStatement) {
+        this.updateDurableLastAckWithPriorityInTxStatement = updateDurableLastAckWithPriorityInTxStatement;
+    }
+
+    public void setFindXidByIdStatement(String findXidByIdStatement) {
+        this.findXidByIdStatement = findXidByIdStatement;
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/BlobJDBCAdapter.java Fri Jun  1 14:32:50 2012
@@ -18,18 +18,16 @@ package org.apache.activemq.store.jdbc.a
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import javax.jms.JMSException;
-import javax.sql.rowset.serial.SerialBlob;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.jdbc.Statements;
 import org.apache.activemq.store.jdbc.TransactionContext;
 import org.apache.activemq.util.ByteArrayOutputStream;
 
@@ -53,10 +51,24 @@ import org.apache.activemq.util.ByteArra
 public class BlobJDBCAdapter extends DefaultJDBCAdapter {
 
     @Override
+    public void setStatements(Statements statements) {
+
+        String addMessageStatement = "INSERT INTO "
+            + statements.getFullMessageTableName()
+            + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())";
+        statements.setAddMessageStatement(addMessageStatement);
+
+        String findMessageByIdStatement = "SELECT MSG FROM " +
+        	statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
+        statements.setFindMessageByIdStatement(findMessageByIdStatement);
+
+        super.setStatements(statements);
+    }
+
+    @Override
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
-            long expiration, byte priority) throws SQLException, IOException {
+                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = null;
-        ResultSet rs = null;
         cleanupExclusiveLock.readLock().lock();
         try {
             // Add the Blob record.
@@ -74,12 +86,29 @@ public class BlobJDBCAdapter extends Def
             s.close();
 
             // Select the blob record so that we can update it.
-            s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
-            		ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
+            updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data);
+            if (xid != null) {
+                byte[] xidVal = xid.getEncodedXidBytes();
+                xidVal[0] = '+';
+                updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal);
+            }
+
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(s);
+        }
+    }
+
+    private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+            s = connection.prepareStatement(statements.getFindMessageByIdStatement(),
+                ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
             s.setLong(1, sequence);
             rs = s.executeQuery();
             if (!rs.next()) {
-                throw new IOException("Failed select blob for message: " + messageID + " in container.");
+                throw new IOException("Failed select blob for message: " + sequence + " in container.");
             }
 
             // Update the blob
@@ -88,9 +117,7 @@ public class BlobJDBCAdapter extends Def
             blob.setBytes(1, data);
             rs.updateBlob(1, blob);
             rs.updateRow();             // Update the row with the updated blob
-
         } finally {
-            cleanupExclusiveLock.readLock().unlock();
             close(rs);
             close(s);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Jun  1 14:32:50 2012
@@ -17,11 +17,8 @@
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
-import java.io.PrintStream;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -35,12 +32,15 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
 import org.apache.activemq.store.jdbc.Statements;
 import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.DataByteArrayOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -201,10 +201,13 @@ public class DefaultJDBCAdapter implemen
             close(s);
         }
     }
-    
 
+
+    /**
+     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
+     */
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
-            long expiration, byte priority) throws SQLException, IOException {
+                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
@@ -221,6 +224,13 @@ public class DefaultJDBCAdapter implemen
             s.setLong(5, expiration);
             s.setLong(6, priority);
             setBinaryData(s, 7, data);
+            if (xid != null) {
+                byte[] xidVal = xid.getEncodedXidBytes();
+                xidVal[0] = '+';
+                setBinaryData(s, 8, xidVal);
+            } else {
+                setBinaryData(s, 8, null);
+            }
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
@@ -326,17 +336,27 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
+    /**
+     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
+     */
+    public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
         PreparedStatement s = c.getRemovedMessageStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(this.statements.getRemoveMessageStatement());
+                s = c.getConnection().prepareStatement(xid == null ?
+                        this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
                 if (this.batchStatments) {
                     c.setRemovedMessageStatement(s);
                 }
             }
-            s.setLong(1, seq);
+            if (xid == null) {
+                s.setLong(1, seq);
+            } else {
+                byte[] xidVal = xid.getEncodedXidBytes();
+                setBinaryData(s, 1, xidVal);
+                s.setLong(2, seq);
+            }
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
@@ -406,26 +426,33 @@ public class DefaultJDBCAdapter implemen
         }
     }
     
-    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
-            String subscriptionName, long seq, long prio) throws SQLException, IOException {
+    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
+                                         String subscriptionName, long seq, long priority) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
+                s = c.getConnection().prepareStatement(xid == null ?
+                        this.statements.getUpdateDurableLastAckWithPriorityStatement() :
+                        this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
                 if (this.batchStatments) {
                     c.setUpdateLastAckStatement(s);
                 }
             }
-            s.setLong(1, seq);
+            if (xid != null) {
+                byte[] xidVal = encodeXid(xid, seq, priority);
+                setBinaryData(s, 1, xidVal);
+            } else {
+                s.setLong(1, seq);
+            }
             s.setString(2, destination.getQualifiedName());
             s.setString(3, clientId);
             s.setString(4, subscriptionName);
-            s.setLong(5, prio);
+            s.setLong(5, priority);
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
-                throw new SQLException("Failed update last ack with priority: " + prio + ", for sub: " + subscriptionName);
+                throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
             }
         } finally {
             cleanupExclusiveLock.readLock().unlock();
@@ -436,18 +463,25 @@ public class DefaultJDBCAdapter implemen
     }
 
 
-    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
-                                        String subscriptionName, long seq, long priority) throws SQLException, IOException {
+    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
+                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
         PreparedStatement s = c.getUpdateLastAckStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
+                s = c.getConnection().prepareStatement(xid == null ?
+                        this.statements.getUpdateDurableLastAckStatement() :
+                        this.statements.getUpdateDurableLastAckInTxStatement());
                 if (this.batchStatments) {
                     c.setUpdateLastAckStatement(s);
                 }
             }
-            s.setLong(1, seq);
+            if (xid != null) {
+                byte[] xidVal = encodeXid(xid, seq, priority);
+                setBinaryData(s, 1, xidVal);
+            } else {
+                s.setLong(1, seq);
+            }
             s.setString(2, destination.getQualifiedName());
             s.setString(3, clientId);
             s.setString(4, subscriptionName);
@@ -466,6 +500,35 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
+    private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
+        byte[] xidVal = xid.getEncodedXidBytes();
+        // encode the update
+        DataByteArrayOutputStream outputStream = xid.getOutputStream();
+        outputStream.position(1);
+        outputStream.writeLong(seq);
+        outputStream.writeByte(Long.valueOf(priority).byteValue());
+        return xidVal;
+    }
+
+    @Override
+    public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
+        PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
+            s.setString(1, destination.getQualifiedName());
+            s.setString(2, clientId);
+            s.setString(3, subName);
+            s.setLong(4, priority);
+            if (s.executeUpdate() != 1) {
+                throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
+            }
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(s);
+        }
+    }
+
     public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
             String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
         // dumpTables(c,
@@ -879,30 +942,38 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    /**
-     * @param c
-     * @param destination
-     * @param clientId
-     * @param subscriberName
-     * @return
-     * @throws SQLException
-     * @throws IOException
-     */
-    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c, ActiveMQDestination destination,
-            String clientId, String subscriberName) throws SQLException, IOException {
+    @Override
+    public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         cleanupExclusiveLock.readLock().lock();
         try {
-            s = c.getConnection().prepareStatement(this.statements.getNextDurableSubscriberMessageStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriberName);
+            s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
             rs = s.executeQuery();
-            if (!rs.next()) {
-                return null;
+            while (rs.next()) {
+                long id = rs.getLong(1);
+                byte[] encodedXid = getBinaryData(rs, 2);
+                if (encodedXid[0] == '+') {
+                    jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
+                } else {
+                    jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
+                }
+            }
+
+            close(rs);
+            close(s);
+
+            s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
+            rs = s.executeQuery();
+            while (rs.next()) {
+                byte[] encodedXid = getBinaryData(rs, 1);
+                String destination = rs.getString(2);
+                String subName = rs.getString(3);
+                String subId = rs.getString(4);
+                jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
+                        ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
+                        subName, subId);
             }
-            return getBinaryData(rs, 1);
         } finally {
             close(rs);
             cleanupExclusiveLock.readLock().unlock();
@@ -910,6 +981,23 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
+    @Override
+    public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
+        PreparedStatement s = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
+            s.setLong(1, sequence);
+            if (s.executeUpdate() != 1) {
+                throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
+            }
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(s);
+        }
+    }
+
+
     public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
             IOException {
         PreparedStatement s = null;
@@ -978,7 +1066,28 @@ public class DefaultJDBCAdapter implemen
             close(s);
         }
     }
-    
+
+    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
+            throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
+            s.setString(1, id.toString());
+            rs = s.executeQuery();
+            long seq = -1;
+            if (rs.next()) {
+                seq = rs.getLong(1);
+            }
+            return seq;
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
+        }
+    }
+
 /*    public void dumpTables(Connection c, String destinationName, String clientId, String
       subscriptionName) throws SQLException { 
         printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
@@ -1034,25 +1143,4 @@ public class DefaultJDBCAdapter implemen
         }
     }  */
 
-    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
-            throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
-        try {
-            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
-            s.setString(1, id.toString());
-            rs = s.executeQuery();
-            long seq = -1;
-            if (rs.next()) {
-                seq = rs.getLong(1);
-            }
-            return seq;
-        } finally {
-            cleanupExclusiveLock.readLock().unlock();
-            close(rs);
-            close(s);
-        }
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleBlobJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleBlobJDBCAdapter.java?rev=1345202&r1=1345201&r2=1345202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleBlobJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/OracleBlobJDBCAdapter.java Fri Jun  1 14:32:50 2012
@@ -48,15 +48,6 @@ public class OracleBlobJDBCAdapter exten
         statements.setLongDataType("NUMBER");
         statements.setSequenceDataType("NUMBER");
 
-        String addMessageStatement = "INSERT INTO "
-            + statements.getFullMessageTableName()
-            + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob())";
-        statements.setAddMessageStatement(addMessageStatement);
-
-        String findMessageByIdStatement = "SELECT MSG FROM " +
-        	statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
-        statements.setFindMessageByIdStatement(findMessageByIdStatement);
-
         super.setStatements(statements);
     }
 



Mime
View raw message