activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r365993 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./ jdbc/ jdbc/adapter/ journal/ memory/
Date Wed, 04 Jan 2006 21:09:23 GMT
Author: chirino
Date: Wed Jan  4 13:09:16 2006
New Revision: 365993

URL: http://svn.apache.org/viewcvs?rev=365993&view=rev
Log:
Added a SubscriptionInfo[] getAllSubscriptions()  to the TopicMessageStore.  We will need
this if we want to eagerly load the durable subs when the broker starts up.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -90,4 +90,8 @@
     public String getMessageReference(MessageId identity) throws IOException {
         return delegate.getMessageReference(identity);
     }
+
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return delegate.getAllSubscriptions();
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -70,6 +70,15 @@
     public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName)
throws IOException;
 
     /**
+     * Lists all the durable subscirptions for a given destination.
+     * 
+     * @param clientId TODO
+     * @param subscriptionName TODO
+     * @return
+     */
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+
+    /**
      * Inserts the subscriber info due to a subscription change
      * <p/>
      * If this is a new subscription and the retroactive is false, then the last

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Wed Jan  4 13:09:16 2006
@@ -74,5 +74,7 @@
 
     public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
 
+    public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination
destination) throws SQLException, IOException;
+
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -126,4 +126,15 @@
         }
     }
 
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        try {
+            return adapter.doGetAllSubscriptions(c, destination);
+        } catch (SQLException e) {
+            throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " +
e, e);
+        } finally {
+            c.close();
+        }
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
Wed Jan  4 13:09:16 2006
@@ -47,5 +47,6 @@
     public boolean isUseExternalMessageReferences();
 
     public String getFullMessageTableName();
+    public String getFindAllDurableSubsStatment();
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
Wed Jan  4 13:09:16 2006
@@ -47,6 +47,7 @@
     private String deleteOldMessagesStatment;
     private String findLastSequenceIdInAcks;
     private String findAllDestinationsStatment;
+    private String findAllDurableSubsStatment;
 
     public CachingStatementProvider(StatementProvider statementProvider) {
         this.statementProvider = statementProvider;
@@ -221,5 +222,12 @@
 
     public String getFullMessageTableName() {
         return statementProvider.getFullMessageTableName();
+    }
+
+    public String getFindAllDurableSubsStatment() {
+        if ( findAllDurableSubsStatment==null ) {
+            findAllDurableSubsStatment = statementProvider.getFindAllDurableSubsStatment();
+        }
+        return findAllDurableSubsStatment;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Jan  4 13:09:16 2006
@@ -21,6 +21,7 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -475,6 +476,32 @@
             subscription.setSelector(rs.getString(1));
             return subscription;
             
+        }
+        finally {
+            close(rs);
+            close(s);
+        }
+    }
+
+    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination
destination) throws SQLException, IOException {
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        try {
+
+            s = c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubsStatment());
+            s.setString(1, destination.getQualifiedName());
+            rs = s.executeQuery();
+
+            ArrayList rc = new ArrayList();
+            while(rs.next()) {
+                SubscriptionInfo subscription = new SubscriptionInfo();
+                subscription.setDestination(destination);
+                subscription.setSelector(rs.getString(1));
+                subscription.setSubcriptionName(rs.getString(2));
+                subscription.setClientId(rs.getString(3));
+            }
+
+            return (SubscriptionInfo[]) rc.toArray(new SubscriptionInfo[rc.size()]);    
       
         }
         finally {
             close(rs);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
Wed Jan  4 13:09:16 2006
@@ -108,6 +108,12 @@
                 "FROM "+getTablePrefix()+durableSubAcksTableName+
                 " WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
     }
+    
+    public String getFindAllDurableSubsStatment() {
+        return "SELECT SELECTOR, SUB_NAME, CLIENT_ID" +
+        "FROM "+getTablePrefix()+durableSubAcksTableName+
+        " WHERE CONTAINER=?";
+    }
 
     public String getUpdateLastAckOfDurableSub() {
         return "UPDATE "+getTablePrefix()+durableSubAcksTableName+

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -179,5 +179,9 @@
     public void deleteSubscription(String clientId, String subscriptionName) throws IOException
{
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
+    
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return longTermStore.getAllSubscriptions();
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -190,4 +190,8 @@
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
 
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return longTermStore.getAllSubscriptions();
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Wed Jan  4 13:09:16 2006
@@ -112,4 +112,8 @@
         subscriberDatabase.clear();
         lastMessageId=null;
     }
+    
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
+    }
 }



Mime
View raw message