activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1033076 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/store/jdbc/ test/java/org/apache/activemq/usecases/
Date Tue, 09 Nov 2010 16:34:54 GMT
Author: gtully
Date: Tue Nov  9 16:34:54 2010
New Revision: 1033076

URL: http://svn.apache.org/viewvc?rev=1033076&view=rev
Log:
more improvements on https://issues.apache.org/activemq/browse/AMQ-2980 - split prioritised
and regular statements, have ack row per priority and fix up deletion and interleaved durable
subs. Simpler slq improves throughput

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Tue Nov  9 16:34:54 2010
@@ -85,9 +85,11 @@ public interface JDBCAdapter {
 
     void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long
nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
 
-    long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination, String clientId, String subscriberName) throws SQLException, IOException;
+    long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination, String clientId, String subscriberName) throws SQLException, IOException;
 
     void doMessageIdScan(TransactionContext c, int limit, JDBCMessageIdScanListener listener)
throws SQLException, IOException;
 
     long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException,
IOException;
+
+    void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination,
String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Tue Nov  9 16:34:54 2010
@@ -43,8 +43,6 @@ import org.apache.commons.logging.LogFac
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore
{
 
     private static final Log LOG = LogFactory.getLog(JDBCTopicMessageStore.class);
-    private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String,
AtomicLong>();
-    private Map<String, AtomicLong> subscriberLastPriorityMap = new ConcurrentHashMap<String,
AtomicLong>();
 
     public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter,
WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
@@ -57,13 +55,16 @@ public class JDBCTopicMessageStore exten
             }
             return;
         }
-        // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
-        	long[] res = adapter.getStoreSequenceId(c, destination, messageId);
-            adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0], res[1]);
+            long[] res = adapter.getStoreSequenceId(c, destination, messageId);
+            if (this.isPrioritizedMessages()) {
+                adapter.doSetLastAckWithPriority(c, destination, clientId, subscriptionName,
res[0], res[1]);
+            } else {
+                adapter.doSetLastAck(c, destination, clientId, subscriptionName, res[0],
res[1]);
+            }
             if (LOG.isTraceEnabled()) {
-                LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]);
+                LOG.trace(clientId + ":" + subscriptionName + " ack, seq: " + res[0] + ",
priority: " + res[1]);
             }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -102,31 +103,15 @@ public class JDBCTopicMessageStore exten
     public synchronized void recoverNextMessages(final String clientId, final String subscriptionName,
final int maxReturned, final MessageRecoveryListener listener)
         throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
-        String subcriberId = getSubscriptionKey(clientId, subscriptionName);
-        AtomicLong last = subscriberLastMessageMap.get(subcriberId);
-        AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
-        if (last == null) {
-            long[] lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination,
clientId, subscriptionName);
-            last = new AtomicLong(lastAcked[0]);
-            subscriberLastMessageMap.put(subcriberId, last);
-            priority = new AtomicLong(lastAcked[1]);
-            subscriberLastMessageMap.put(subcriberId, priority);
-        }
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("recoverNextMessage - last: " + last.get() + ", priority: " + priority);
-        }
-        final AtomicLong finalLast = last;
-        final AtomicLong finalPriority = priority;
         try {
-            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(),
priority.get(), maxReturned, new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
+                    0, 0, maxReturned, new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception
{
                     if (listener.hasSpace()) {
                         Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         if (listener.recoverMessage(msg)) {
-                            finalLast.set(sequenceId);
-                            finalPriority.set(msg.getPriority());
                             return true;
                         }
                     }
@@ -142,15 +127,11 @@ public class JDBCTopicMessageStore exten
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {
             c.close();
-            subscriberLastMessageMap.put(subcriberId, finalLast);
-            subscriberLastPriorityMap.put(subcriberId, finalPriority);
         }
     }
 
     public void resetBatching(String clientId, String subscriptionName) {
-        String subcriberId = getSubscriptionKey(clientId, subscriptionName);
-        subscriberLastMessageMap.remove(subcriberId);
-        subscriberLastPriorityMap.remove(subcriberId);
+        // DB always recovers from last ack
     }
 
     public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws
IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Tue Nov  9 16:34:54 2010
@@ -48,7 +48,7 @@ public class Statements {
     private String createDurableSubStatement;
     private String findDurableSubStatement;
     private String findAllDurableSubsStatement;
-    private String updateLastAckOfDurableSubStatement;
+    private String updateLastPriorityAckRowOfDurableSubStatement;
     private String deleteSubscriptionStatement;
     private String findAllDurableSubMessagesStatement;
     private String findDurableSubMessagesStatement;
@@ -74,6 +74,8 @@ public class Statements {
 
     private String insertDurablePriorityAckStatement;
     private String updateDurableLastAckStatement;
+    private String deleteOldMessagesStatementWithPriority;
+    private String durableSubscriberMessageCountStatementWithPriority;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -213,7 +215,7 @@ public class Statements {
     public String getFindDurableSubStatement() {
         if (findDurableSubStatement == null) {
             findDurableSubStatement = "SELECT SELECTOR, SUB_DEST " + "FROM " + getFullAckTableName()
-                                      + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?
AND SUB_DEST IS NOT NULL";
+                                      + " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
         }
         return findDurableSubStatement;
     }
@@ -221,17 +223,17 @@ public class Statements {
     public String getFindAllDurableSubsStatement() {
         if (findAllDurableSubsStatement == null) {
             findAllDurableSubsStatement = "SELECT SELECTOR, SUB_NAME, CLIENT_ID, SUB_DEST"
+ " FROM "
-                                          + getFullAckTableName() + " WHERE CONTAINER=? AND
SUB_DEST IS NOT NULL";
+                                          + getFullAckTableName() + " WHERE CONTAINER=? AND
PRIORITY=0";
         }
         return findAllDurableSubsStatement;
     }
 
-    public String getUpdateLastAckOfDurableSubStatement() {
-        if (updateLastAckOfDurableSubStatement == null) {
-            updateLastAckOfDurableSubStatement = "UPDATE " + getFullAckTableName() + " SET
LAST_ACKED_ID=?"
+    public String getUpdateLastPriorityAckRowOfDurableSubStatement() {
+        if (updateLastPriorityAckRowOfDurableSubStatement == null) {
+            updateLastPriorityAckRowOfDurableSubStatement = "UPDATE " + getFullAckTableName()
+ " SET LAST_ACKED_ID=?"
                                                  + " WHERE CONTAINER=? AND CLIENT_ID=? AND
SUB_NAME=? AND PRIORITY=?";
         }
-        return updateLastAckOfDurableSubStatement;
+        return updateLastPriorityAckRowOfDurableSubStatement;
     }
 
     public String getDeleteSubscriptionStatement() {
@@ -248,7 +250,7 @@ public class Statements {
                                                  + " M, " + getFullAckTableName() + " D "
                                                  + " WHERE D.CONTAINER=? AND D.CLIENT_ID=?
AND D.SUB_NAME=?"
                                                  + " AND M.CONTAINER=D.CONTAINER AND M.ID
> D.LAST_ACKED_ID"
-                                                 + " ORDER BY M.ID";
+                                                 + " ORDER BY M.PRIORITY DESC, M.ID";
         }
         return findAllDurableSubMessagesStatement;
     }
@@ -258,7 +260,7 @@ public class Statements {
             findDurableSubMessagesStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
+ " M, "
                                               + getFullAckTableName() + " D "
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
D.SUB_NAME=?"
-                                              + " AND M.CONTAINER=D.CONTAINER AND M.ID >
?"
+                                              + " AND M.CONTAINER=D.CONTAINER AND M.ID >
D.LAST_ACKED_ID"
                                               + " ORDER BY M.ID";
         }
         return findDurableSubMessagesStatement;
@@ -269,20 +271,8 @@ public class Statements {
             findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName()
+ " M, "
                                               + getFullAckTableName() + " D "
                                               + " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
D.SUB_NAME=?"
-                                              + " AND M.CONTAINER=D.CONTAINER AND "
-                                              + "((M.ID > ? AND M.PRIORITY = ?) "
-                                              + "   OR (M.PRIORITY <> ? "
-                                              + "     AND ( M.ID >"
-                                              + "          ( SELECT LAST_ACKED_ID FROM "
+ getFullAckTableName()
-                                              + "           WHERE CONTAINER=D.CONTAINER AND
CLIENT_ID=D.CLIENT_ID"
-                                              + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY
)"
-                                              + "          OR "
-                                              + "          ( (SELECT COUNT(LAST_ACKED_ID)
FROM " + getFullAckTableName()
-                                              + "           WHERE CONTAINER=D.CONTAINER AND
CLIENT_ID=D.CLIENT_ID"
-                                              + "           AND SUB_NAME=D.SUB_NAME AND PRIORITY=M.PRIORITY)
= 0)"
-                                              + "        )"
-                                              + "   )"
-                                              + ")"
+                                              + " AND M.CONTAINER=D.CONTAINER"
+                                              + " AND M.PRIORITY=D.PRIORITY AND M.ID >
D.LAST_ACKED_ID"
                                               + " ORDER BY M.PRIORITY DESC, M.ID";
         }
         return findDurableSubMessagesByPriorityStatement;
@@ -324,21 +314,33 @@ public class Statements {
                                                      + " M, "
                                                      + getFullAckTableName()
                                                      + " D "
-                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=?
AND D.SUB_NAME=? AND D.SUB_DEST IS NOT NULL"
+                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=?
AND D.SUB_NAME=?"
                                                      + " AND M.CONTAINER=D.CONTAINER "
-                                                     + "     AND ( M.ID >"
+                                                     + "     AND M.ID >"
                                                      + "          ( SELECT LAST_ACKED_ID
FROM " + getFullAckTableName()
                                                      + "           WHERE CONTAINER=D.CONTAINER
AND CLIENT_ID=D.CLIENT_ID"
-                                                     + "           AND SUB_NAME=D.SUB_NAME
AND PRIORITY=M.PRIORITY )"
-                                                     + "          OR "
-                                                     + "          ( (SELECT COUNT(LAST_ACKED_ID)
FROM " + getFullAckTableName()
-                                                     + "           WHERE CONTAINER=D.CONTAINER
AND CLIENT_ID=D.CLIENT_ID"
-                                                     + "           AND SUB_NAME=D.SUB_NAME
AND PRIORITY=M.PRIORITY) = 0)"
-                                                     + "        )";
+                                                     + "           AND SUB_NAME=D.SUB_NAME
)";
+
         }
         return durableSubscriberMessageCountStatement;
     }
 
+    public String getDurableSubscriberMessageCountStatementWithPriority() {
+        if (durableSubscriberMessageCountStatementWithPriority == null) {
+            durableSubscriberMessageCountStatementWithPriority = "SELECT COUNT(*) FROM "
+                                                     + getFullMessageTableName()
+                                                     + " M, "
+                                                     + getFullAckTableName()
+                                                     + " D "
+                                                     + " WHERE D.CONTAINER=? AND D.CLIENT_ID=?
AND D.SUB_NAME=?"
+                                                     + " AND M.CONTAINER=D.CONTAINER "
+                                                     + " AND M.PRIORITY=D.PRIORITY "
+                                                     + " AND M.ID > D.LAST_ACKED_ID";
+        }
+
+        return durableSubscriberMessageCountStatementWithPriority;
+    }
+
     public String getFindAllDestinationsStatement() {
         if (findAllDestinationsStatement == null) {
             findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
@@ -360,29 +362,37 @@ public class Statements {
         return removeAllSubscriptionsStatement;
     }
 
-    public String getDeleteOldMessagesStatement() {
-        if (deleteOldMessagesStatement == null) {
-            deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+    public String getDeleteOldMessagesStatementWithPriority() {
+        if (deleteOldMessagesStatementWithPriority == null) {
+            deleteOldMessagesStatementWithPriority = "DELETE FROM " + getFullMessageTableName()
                                          + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
-                                         + " OR (ID < "
+                                         + " OR (ID <= "
                                          + "     ( SELECT min(" + getFullAckTableName() +
".LAST_ACKED_ID)"
                                          + "       FROM " + getFullAckTableName() + " WHERE
"
                                          +          getFullAckTableName() + ".CONTAINER="
                                          +          getFullMessageTableName() + ".CONTAINER"
-                                         + "        AND " + getFullAckTableName() + ".SUB_DEST
IS NULL"
                                          + "        AND " + getFullAckTableName() + ".PRIORITY="
+ getFullMessageTableName() + ".PRIORITY )"
-                                         + "    AND ID <"
+                                         + "   )";
+        }
+        return deleteOldMessagesStatementWithPriority;
+    }
+
+    public String getDeleteOldMessagesStatement() {
+        if (deleteOldMessagesStatement == null) {
+            deleteOldMessagesStatement = "DELETE FROM " + getFullMessageTableName()
+                                         + " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)"
+                                         + " OR (ID <= "
                                          + "     ( SELECT min(" + getFullAckTableName() +
".LAST_ACKED_ID)"
                                          + "       FROM " + getFullAckTableName() + " WHERE
"
                                          +          getFullAckTableName() + ".CONTAINER="
-                                         +          getFullMessageTableName() + ".CONTAINER"
-                                         + "        AND " + getFullAckTableName() + ".SUB_DEST
IS NOT NULL )"
+                                         +          getFullMessageTableName() + ".CONTAINER
)"
                                          + "   )";
 
         }
         return deleteOldMessagesStatement;
     }
 
+
     public String getLockCreateStatement() {
         if (lockCreateStatement == null) {
             lockCreateStatement = "SELECT * FROM " + getFullLockTableName();
@@ -441,11 +451,9 @@ public class Statements {
      */
     public String getLastAckedDurableSubscriberMessageStatement() {
         if (lastAckedDurableSubscriberMessageStatement == null) {
-            lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID), PRIORITY
FROM "
+            lastAckedDurableSubscriberMessageStatement = "SELECT MAX(LAST_ACKED_ID) FROM
"
                                                          + getFullAckTableName()
-                                                         + " WHERE CONTAINER=? AND CLIENT_ID=?
AND SUB_NAME=?"
-                                                         + " GROUP BY PRIORITY"
-                                                         + " ORDER BY PRIORITY ASC";
+                                                         + " WHERE CONTAINER=? AND CLIENT_ID=?
AND SUB_NAME=?";                                                    
         }
         return lastAckedDurableSubscriberMessageStatement;
     }
@@ -473,8 +481,7 @@ public class Statements {
     public String getUpdateDurableLastAckStatement() {
         if (updateDurableLastAckStatement == null) {
             updateDurableLastAckStatement  = "UPDATE " + getFullAckTableName()
-                    + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?"
-                    + " AND PRIORITY = " + (Byte.MAX_VALUE - 1);
+                    + " SET LAST_ACKED_ID = ? WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
         }
         return  updateDurableLastAckStatement;
     }
@@ -637,6 +644,10 @@ public class Statements {
         this.deleteOldMessagesStatement = deleteOldMessagesStatment;
     }
 
+    public void setDeleteOldMessagesStatementWithPriority(String deleteOldMessagesStatmentWithPriority)
{
+        this.deleteOldMessagesStatementWithPriority = deleteOldMessagesStatmentWithPriority;
+    }
+
     public void setDeleteSubscriptionStatement(String deleteSubscriptionStatment) {
         this.deleteSubscriptionStatement = deleteSubscriptionStatment;
     }
@@ -697,8 +708,8 @@ public class Statements {
         this.removeMessageStatement = removeMessageStatement;
     }
 
-    public void setUpdateLastAckOfDurableSubStatement(String updateLastAckOfDurableSub) {
-        this.updateLastAckOfDurableSubStatement = updateLastAckOfDurableSub;
+    public void setUpdateLastPriorityAckRowOfDurableSubStatement(String updateLastPriorityAckRowOfDurableSubStatement)
{
+        this.updateLastPriorityAckRowOfDurableSubStatement = updateLastPriorityAckRowOfDurableSubStatement;
     }
 
     public void setUpdateMessageStatement(String updateMessageStatment) {
@@ -743,6 +754,10 @@ public class Statements {
         this.durableSubscriberMessageCountStatement = durableSubscriberMessageCountStatement;
     }
 
+    public void setDurableSubscriberMessageCountStatementWithPriority(String durableSubscriberMessageCountStatementWithPriority)
{
+        this.durableSubscriberMessageCountStatementWithPriority = durableSubscriberMessageCountStatementWithPriority;
+    }
+
     /**
      * @param findNextMessagesStatement the findNextMessagesStatement to set
      */

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Tue Nov  9 16:34:54 2010
@@ -409,15 +409,13 @@ public class DefaultJDBCAdapter implemen
         }
     }
     
-    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String
clientId,
+    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination,
String clientId,
             String subscriptionName, long seq, long prio) throws SQLException, IOException
{
-        doCreatePriorityAckRow(c, destination, clientId, subscriptionName, prio);
-        doUpdateLatestAckRow(c, destination, clientId, subscriptionName, seq, prio);
         PreparedStatement s = c.getUpdateLastAckStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
             if (s == null) {
-                s = c.getConnection().prepareStatement(this.statements.getUpdateLastAckOfDurableSubStatement());
+                s = c.getConnection().prepareStatement(this.statements.getUpdateLastPriorityAckRowOfDurableSubStatement());
                 if (this.batchStatments) {
                     c.setUpdateLastAckStatement(s);
                 }
@@ -435,72 +433,39 @@ public class DefaultJDBCAdapter implemen
         } finally {
             cleanupExclusiveLock.readLock().unlock();
             if (!this.batchStatments) {
-                s.close();
+                close(s);
             }
         }
     }
 
-    private void doCreatePriorityAckRow(TransactionContext c, ActiveMQDestination destination,
String clientId,
-                                        String subscriptionName,long priority) throws SQLException,
IOException{
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        boolean exists = false;
+
+    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String
clientId,
+                                        String subscriptionName, long seq, long priority)
throws SQLException, IOException {
+        PreparedStatement s = c.getUpdateLastAckStatement();
         cleanupExclusiveLock.readLock().lock();
         try {
-            s = c.getConnection().prepareStatement(this.statements.getSelectDurablePriorityAckStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriptionName);
-            s.setLong(4, priority);
-
-            rs = s.executeQuery();
-            exists = rs.next();
-        } finally {
-            cleanupExclusiveLock.readLock().unlock();
-            close(rs);
-            close(s);
-        }
-
-        if (!exists) {
-            cleanupExclusiveLock.readLock().lock();
-            try {
-                s = c.getConnection().prepareStatement(this.statements.getInsertDurablePriorityAckStatement());
-                s.setString(1, destination.getQualifiedName());
-                s.setString(2, clientId);
-                s.setString(3, subscriptionName);
-                s.setLong(4, priority);
-                if (s.executeUpdate() != 1) {
-                    throw new IOException("Could not insert initial ack entry for priority:
"
-                            + priority + ", for sub: " + subscriptionName);
+            if (s == null) {
+                s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
+                if (this.batchStatments) {
+                    c.setUpdateLastAckStatement(s);
                 }
-
-            } finally {
-                cleanupExclusiveLock.readLock().unlock();
-                close(s);
             }
-        }
-    }
-
-    private void doUpdateLatestAckRow(TransactionContext c, ActiveMQDestination destination,
String clientId,
-                                        String subscriptionName, long seq, long priority)
throws SQLException, IOException{
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        cleanupExclusiveLock.readLock().lock();
-        try {
-            s = c.getConnection().prepareStatement(this.statements.getUpdateDurableLastAckStatement());
             s.setLong(1, seq);
             s.setString(2, destination.getQualifiedName());
             s.setString(3, clientId);
             s.setString(4, subscriptionName);
 
-           if (s.executeUpdate() != 1) {
+            if (this.batchStatments) {
+                s.addBatch();
+            } else if (s.executeUpdate() != 1) {
                 throw new IOException("Could not update last ack seq : "
                             + seq + ", for sub: " + subscriptionName);
-           }
+            }
         } finally {
             cleanupExclusiveLock.readLock().unlock();
-            close(rs);
-            close(s);
+            if (!this.batchStatments) {
+                close(s);
+            }            
         }
     }
 
@@ -553,11 +518,6 @@ public class DefaultJDBCAdapter implemen
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
-            s.setLong(4, seq);
-            if (isPrioritizedMessages()) {
-                s.setLong(5, priority);
-                s.setLong(6, priority);
-            }
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
@@ -587,7 +547,11 @@ public class DefaultJDBCAdapter implemen
         int result = 0;
         cleanupExclusiveLock.readLock().lock();
         try {
-            s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
+            if (this.isPrioritizedMessages()) {
+                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
+            } else {
+                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
   
+            }
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -618,7 +582,6 @@ public class DefaultJDBCAdapter implemen
         cleanupExclusiveLock.readLock().lock();
         try {
             long lastMessageId = -1;
-            long priority = Byte.MAX_VALUE - 1;
             if (!retroactive) {
                 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
                 ResultSet rs = null;
@@ -633,16 +596,25 @@ public class DefaultJDBCAdapter implemen
                 }
             }
             s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
-            s.setString(1, info.getDestination().getQualifiedName());
-            s.setString(2, info.getClientId());
-            s.setString(3, info.getSubscriptionName());
-            s.setString(4, info.getSelector());
-            s.setLong(5, lastMessageId);
-            s.setString(6, info.getSubscribedDestination().getQualifiedName());
-            s.setLong(7, priority);
-            if (s.executeUpdate() != 1) {
-                throw new IOException("Could not create durable subscription for: " + info.getClientId());
+            int maxPriority = 1;
+            if (this.isPrioritizedMessages()) {
+                maxPriority = 10;
             }
+
+            for (int priority = 0; priority < maxPriority; priority++) {
+                s.setString(1, info.getDestination().getQualifiedName());
+                s.setString(2, info.getClientId());
+                s.setString(3, info.getSubscriptionName());
+                s.setString(4, info.getSelector());
+                s.setLong(5, lastMessageId);
+                s.setString(6, info.getSubscribedDestination().getQualifiedName());
+                s.setLong(7, priority);
+
+                if (s.executeUpdate() != 1) {
+                    throw new IOException("Could not create durable subscription for: " +
info.getClientId());
+                }
+            }
+
         } finally {
             cleanupExclusiveLock.readLock().unlock();
             close(s);
@@ -744,8 +716,13 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         cleanupExclusiveLock.writeLock().lock();
         try {
-            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
-            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
+            if (this.isPrioritizedMessages()) {
+                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
+                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
+            } else {
+                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatement());
+                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatement());
+            }
             s.setLong(1, System.currentTimeMillis());
             int i = s.executeUpdate();
             LOG.debug("Deleted " + i + " old message(s).");
@@ -755,11 +732,11 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination,
+    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination
destination,
             String clientId, String subscriberName) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
-        long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
+        long result = -1;
         cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
@@ -768,8 +745,7 @@ public class DefaultJDBCAdapter implemen
             s.setString(3, subscriberName);
             rs = s.executeQuery();
             if (rs.next()) {
-                result[0] = rs.getLong(1);
-                result[1] = rs.getLong(2);
+                result = rs.getLong(1);
             }
         } finally {
             cleanupExclusiveLock.readLock().unlock();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Tue Nov  9 16:34:54 2010
@@ -18,6 +18,7 @@
 package org.apache.activemq.store.jdbc;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Message;
@@ -117,10 +118,11 @@ public class JDBCMessagePriorityTest ext
         }
 
         final int closeFrequency = MSG_NUM/2;
-
+        HashMap dups = new HashMap();
         sub = sess.createDurableSubscriber(topic, subName);
         for (int i=0; i < MSG_NUM * maxPriority; i++) {
             Message msg = sub.receive(10000);
+            assertNull("no duplicate message", dups.put(msg.getJMSMessageID(), subName));
             LOG.info("received i=" + i + ", m=" + (msg!=null?
                     msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority()
                     : null) );

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1033076&r1=1033075&r2=1033076&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Tue Nov  9 16:34:54 2010
@@ -190,6 +190,14 @@ public class DurableSubscriptionOfflineT
          assertEquals(sent, listener.count);
      }
 
+    
+    public void initCombosForTestVerifyAllConsumedAreAcked() throws Exception {
+        this.addCombinationValues("defaultPersistenceAdapter",
+               new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+        this.addCombinationValues("usePrioritySupport",
+                new Object[]{ Boolean.TRUE, Boolean.FALSE});
+    }
+
      public void testVerifyAllConsumedAreAcked() throws Exception {
          // create durable subscription
          Connection con = createConnection();
@@ -377,6 +385,83 @@ public class DurableSubscriptionOfflineT
         assertEquals(sent, listener.count);
         assertEquals(sent, listener3.count);
     }
+
+
+    public void initCombosForTestInterleavedOfflineSubscriptionCanConsume() throws Exception
{
+        this.addCombinationValues("defaultPersistenceAdapter",
+                new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
+    }
+
+    public void testInterleavedOfflineSubscriptionCanConsume() throws Exception {
+        // create durable subscription 1
+        Connection con = createConnection("cliId1");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
+        session.close();
+        con.close();
+
+        // send messages
+        con = createConnection();
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        int sent = 0;
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+
+        // create durable subscription 2
+        Connection con2 = createConnection("cliId2");
+        Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener2 = new Listener();
+        consumer2.setMessageListener(listener2);
+
+        assertEquals(0, listener2.count);
+        session2.close();
+        con2.close();
+
+        // send some more
+        for (int i = 0; i < 10; i++) {
+            sent++;
+            Message message = session.createMessage();
+            message.setStringProperty("filter", "true");
+            producer.send(topic, message);
+        }
+
+        Thread.sleep(1 * 1000);
+        session.close();
+        con.close();
+
+        con2 = createConnection("cliId2");
+        session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer2 = session2.createDurableSubscriber(topic, "SubsId", "filter = 'true'",
true);
+        listener2 = new Listener();
+        consumer2.setMessageListener(listener2);
+        // test online subs
+        Thread.sleep(3 * 1000);
+
+        assertEquals(10, listener2.count);
+
+        // consume all messages
+        con = createConnection("cliId1");
+        session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter
= 'true'", true);
+        Listener listener = new Listener();
+        consumer.setMessageListener(listener);
+
+        Thread.sleep(3 * 1000);
+
+        session.close();
+        con.close();
+
+        assertEquals("offline consumer got all", sent, listener.count);
+    }    
     
     public static class Listener implements MessageListener {
         int count = 0;



Mime
View raw message