activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r966291 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/store/
Date Wed, 21 Jul 2010 16:06:39 GMT
Author: dejanb
Date: Wed Jul 21 16:06:39 2010
New Revision: 966291

URL: http://svn.apache.org/viewvc?rev=966291&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at adding priority for queues
in JDBC store

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Wed Jul 21 16:06:39 2010
@@ -30,12 +30,14 @@ import org.apache.activemq.command.Subsc
 public interface JDBCAdapter {
 
     void setStatements(Statements statementProvider);
+    
+    void setPrioritizedMessages(boolean prioritizedMessages);
 
     void doCreateTables(TransactionContext c) throws SQLException, IOException;
 
     void doDropTables(TransactionContext c) throws SQLException, IOException;
 
-    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data, long expiration) throws SQLException, IOException;
+    void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data, long expiration, byte priority) throws SQLException, IOException;
 
     void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId,
ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException,
IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Wed Jul 21 16:06:39 2010
@@ -82,7 +82,7 @@ public class JDBCMessageStore extends Ab
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {      
-            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration());
+            adapter.doAddMessage(c,sequenceId, messageId, destination, data, message.getExpiration(),
message.getPriority());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + "
in container: " + e, e);
@@ -224,7 +224,6 @@ public class JDBCMessageStore extends Ab
      */
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener)
throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
-
         try {
             adapter.doRecoverNextMessages(c, destination, lastStoreSequenceId.get(), maxReturned,
new JDBCMessageRecoveryListener() {
 
@@ -294,4 +293,9 @@ public class JDBCMessageStore extends Ab
         }
         return result;
     }
+    
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        super.setPrioritizedMessages(prioritizedMessages);
+        adapter.setPrioritizedMessages(prioritizedMessages);
+    }   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Wed Jul 21 16:06:39 2010
@@ -65,6 +65,7 @@ public class Statements {
     private String lastAckedDurableSubscriberMessageStatement;
     private String destinationMessageCountStatement;
     private String findNextMessagesStatement;
+    private String findNextMessagesByPriorityStatement;
     private boolean useLockCreateWhereClause;
     private String findAllMessageIdsStatement;
     private String lastProducerSequenceIdStatement;
@@ -74,12 +75,13 @@ public class Statements {
             createSchemaStatements = new String[] {
                 "CREATE TABLE " + getFullMessageTableName() + "(" + "ID " + sequenceDataType
+ " NOT NULL"
                     + ", CONTAINER " + containerNameDataType + ", MSGID_PROD " + msgIdDataType
+ ", MSGID_SEQ "
-                    + sequenceDataType + ", EXPIRATION " + longDataType + ", MSG "
+                    + sequenceDataType + ", EXPIRATION " + longDataType + ", PRIORITY " +
sequenceDataType + ", MSG "
                     + (useExternalMessageReferences ? stringIdDataType : binaryDataType)
                     + ", PRIMARY KEY ( ID ) )",
                 "CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " + getFullMessageTableName()
+ " (MSGID_PROD,MSGID_SEQ)",
                 "CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " + getFullMessageTableName()
+ " (CONTAINER)",
                 "CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " + getFullMessageTableName()
+ " (EXPIRATION)",
+                "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName()
+ " (PRIORITY)",
                 "CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " + containerNameDataType
+ " NOT NULL"
                     + ", SUB_DEST " + stringIdDataType 
                     + ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ", SUB_NAME " + stringIdDataType
@@ -107,7 +109,7 @@ public class Statements {
         if (addMessageStatement == null) {
             addMessageStatement = "INSERT INTO "
                                   + getFullMessageTableName()
-                                  + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, MSG)
VALUES (?, ?, ?, ?, ?, ?)";
+                                  + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY,
MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
         }
         return addMessageStatement;
     }
@@ -369,6 +371,17 @@ public class Statements {
     }
 
     /**
+     * @return the findNextMessagesStatement
+     */
+    public String getFindNextMessagesByPriorityStatement() {
+        if (findNextMessagesByPriorityStatement == null) {
+            findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+                                        + " WHERE CONTAINER=? ORDER BY PRIORITY DESC, ID";
+        }
+        return findNextMessagesByPriorityStatement;
+    }    
+    
+    /**
      * @return the lastAckedDurableSubscriberMessageStatement
      */
     public String getLastAckedDurableSubscriberMessageStatement() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Jul 21 16:06:39 2010
@@ -56,6 +56,7 @@ public class DefaultJDBCAdapter implemen
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments = true;
+    protected boolean prioritizedMessages;
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException
{
         s.setBytes(index, data);
@@ -190,7 +191,7 @@ public class DefaultJDBCAdapter implemen
     
 
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data,
-            long expiration) throws SQLException, IOException {
+            long expiration, byte priority) throws SQLException, IOException {
         PreparedStatement s = c.getAddMessageStatement();
         try {
             if (s == null) {
@@ -204,7 +205,8 @@ public class DefaultJDBCAdapter implemen
             s.setLong(3, messageID.getProducerSequenceId());
             s.setString(4, destination.getQualifiedName());
             s.setLong(5, expiration);
-            setBinaryData(s, 6, data);
+            s.setLong(6, priority);
+            setBinaryData(s, 7, data);
             if (this.batchStatments) {
                 s.addBatch();
             } else if (s.executeUpdate() != 1) {
@@ -710,6 +712,14 @@ public class DefaultJDBCAdapter implemen
 
     public void setStatements(Statements statements) {
         this.statements = statements;
+    }    
+
+    public boolean isPrioritizedMessages() {
+        return prioritizedMessages;
+    }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        this.prioritizedMessages = prioritizedMessages;
     }
 
     /**
@@ -765,10 +775,16 @@ public class DefaultJDBCAdapter implemen
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
-            s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+            if (isPrioritizedMessages()) {
+                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
+            } else {
+                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+            }
             s.setMaxRows(maxReturned * 2);
             s.setString(1, destination.getQualifiedName());
-            s.setLong(2, nextSeq);
+            if (!isPrioritizedMessages()) {
+                s.setLong(2, nextSeq);
+            }
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Wed Jul 21 16:06:39 2010
@@ -148,7 +148,7 @@ abstract public class MessagePriorityTes
         MessageConsumer queueConsumer = sess.createConsumer(queue);
         for (int i = 0; i < MSG_NUM * 2; i++) {
             Message msg = queueConsumer.receive(1000);
-            assertNotNull(msg);
+            assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI
: LOW_PRI, msg.getJMSPriority());
         }
     }



Mime
View raw message