activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r924752 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc: JDBCAdapter.java JDBCPersistenceAdapter.java Statements.java adapter/DefaultJDBCAdapter.java
Date Thu, 18 Mar 2010 13:05:43 GMT
Author: dejanb
Date: Thu Mar 18 13:05:43 2010
New Revision: 924752

URL: http://svn.apache.org/viewvc?rev=924752&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2594 - proper init of broker and store seq

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=924752&r1=924751&r2=924752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Thu Mar 18 13:05:43 2010
@@ -39,6 +39,8 @@ public interface JDBCAdapter {
     void doAddMessageReference(TransactionContext c, long sequence, MessageId messageId,
ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException,
IOException;
 
     byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException;
+    
+    byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException,
IOException;
 
     String doGetMessageReference(TransactionContext c, long id) throws SQLException, IOException;
 
@@ -66,7 +68,7 @@ public interface JDBCAdapter {
 
     void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
 
-    long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException;
+    long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
 
     Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException,
IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=924752&r1=924751&r2=924752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Mar 18 13:05:43 2010
@@ -35,6 +35,7 @@ import org.apache.activemq.broker.Connec
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
@@ -44,6 +45,7 @@ import org.apache.activemq.store.Transac
 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.store.memory.MemoryTransactionStore;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -223,7 +225,14 @@ public class JDBCPersistenceAdapter exte
         // Get a connection and insert the message into the DB.
         TransactionContext c = getTransactionContext();
         try {
-            return getAdapter().doGetLastMessageBrokerSequenceId(c);
+            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
+            sequenceGenerator.setLastSequenceId(seq);
+            long brokerSeq = 0;
+            if (seq != 0) {
+            	Message last = (Message)wireFormat.unmarshal(new ByteSequence(getAdapter().doGetMessageById(c,
seq)));
+            	brokerSeq = last.getMessageId().getBrokerSequenceId();
+            }
+            return brokerSeq;
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get last broker message id: " + e,
e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=924752&r1=924751&r2=924752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Thu Mar 18 13:05:43 2010
@@ -41,6 +41,7 @@ public class Statements {
     private String removeMessageStatment;
     private String findMessageSequenceIdStatement;
     private String findMessageStatement;
+    private String findMessageByIdStatement;
     private String findAllMessagesStatement;
     private String findLastSequenceIdInMsgsStatement;
     private String findLastSequenceIdInAcksStatement;
@@ -139,6 +140,13 @@ public class Statements {
         return findMessageStatement;
     }
 
+    public String getFindMessageByIdStatement() {
+        if (findMessageStatement == null) {
+            findMessageStatement = "SELECT MSG FROM " + getFullMessageTableName() + " WHERE
ID=?";
+        }
+        return findMessageStatement;
+    }
+    
     public String getFindAllMessagesStatement() {
         if (findAllMessagesStatement == null) {
             findAllMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=924752&r1=924751&r2=924752&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Thu Mar 18 13:05:43 2010
@@ -139,7 +139,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,
IOException {
+    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException,
IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         try {
@@ -163,6 +163,25 @@ public class DefaultJDBCAdapter implemen
             close(s);
         }
     }
+    
+    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException,
IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+            s = c.getConnection().prepareStatement(
+                    this.statements.getFindMessageByIdStatement());
+            s.setLong(1, storeSequenceId);
+            rs = s.executeQuery();
+            if (!rs.next()) {
+                return null;
+            }
+            return getBinaryData(rs, 1);
+        } finally {
+            close(rs);
+            close(s);
+        }
+    }
+    
 
     public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination
destination, byte[] data,
             long expiration) throws SQLException, IOException {
@@ -794,4 +813,5 @@ public class DefaultJDBCAdapter implemen
      * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close();
} catch (Throwable ignore) {}
      * try { s.close(); } catch (Throwable ignore) {} } }
      */
+
 }



Mime
View raw message