activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r454368 [2/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/kaha/ main...
Date Mon, 09 Oct 2006 13:05:22 GMT
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Oct  9 06:05:20 2006
@@ -1,20 +1,17 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
@@ -25,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -38,423 +34,380 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * Implements all the default JDBC operations that are used
- * by the JDBCPersistenceAdapter.
- * <p/>
- * sub-classing is encouraged to override the default
- * implementation of methods to account for differences
- * in JDBC Driver implementations.
- * <p/>
- * The JDBCAdapter inserts and extracts BLOB data using the
- * getBytes()/setBytes() operations.
- * <p/>
+ * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
+ * encouraged to override the default implementation of methods to account for differences in JDBC Driver
+ * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
  * The databases/JDBC drivers that use this adapter are:
  * <ul>
  * <li></li>
  * </ul>
- *
+ * 
  * @org.apache.xbean.XBean element="defaultJDBCAdapter"
  * 
  * @version $Revision: 1.10 $
  */
-public class DefaultJDBCAdapter implements JDBCAdapter {
-
-    private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
+public class DefaultJDBCAdapter implements JDBCAdapter{
 
+    private static final Log log=LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments=true;
 
-    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
-        s.setBytes(index, data);
+    protected void setBinaryData(PreparedStatement s,int index,byte data[]) throws SQLException{
+        s.setBytes(index,data);
     }
 
-    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
+    protected byte[] getBinaryData(ResultSet rs,int index) throws SQLException{
         return rs.getBytes(index);
     }
 
-    public void doCreateTables(TransactionContext c) throws SQLException, IOException {
-        Statement s = null;
-        try {
-            
-            // Check to see if the table already exists.  If it does, then don't log warnings during startup.
-            // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
-            boolean alreadyExists = false;
+    public void doCreateTables(TransactionContext c) throws SQLException,IOException{
+        Statement s=null;
+        try{
+            // Check to see if the table already exists. If it does, then don't log warnings during startup.
+            // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version
+            // of the table
+            boolean alreadyExists=false;
             ResultSet rs=null;
-            try {
-                rs= c.getConnection().getMetaData().getTables(null,null, statements.getFullMessageTableName(), new String[] {"TABLE"});
-                alreadyExists = rs.next();                
-            } catch (Throwable ignore) {
-            } finally {
+            try{
+                rs=c.getConnection().getMetaData().getTables(null,null,statements.getFullMessageTableName(),
+                        new String[] { "TABLE" });
+                alreadyExists=rs.next();
+            }catch(Throwable ignore){
+            }finally{
                 close(rs);
             }
-            
-            s = c.getConnection().createStatement();
-            String[] createStatments = statements.getCreateSchemaStatements();
-            for (int i = 0; i < createStatments.length; i++) {
+            s=c.getConnection().createStatement();
+            String[] createStatments=statements.getCreateSchemaStatements();
+            for(int i=0;i<createStatments.length;i++){
                 // This will fail usually since the tables will be
                 // created already.
-                try {
+                try{
                     log.debug("Executing SQL: "+createStatments[i]);
-                    boolean rc = s.execute(createStatments[i]);
-                }
-                catch (SQLException e) {
-                    if( alreadyExists )  {
-                        log.debug("Could not create JDBC tables; The message table already existed." +
-                                " Failure was: " + createStatments[i] + " Message: " + e.getMessage() +
-                                " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
-                    } else {
-                        log.warn("Could not create JDBC tables; they could already exist." +
-                            " Failure was: " + createStatments[i] + " Message: " + e.getMessage() +
-                            " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
+                    boolean rc=s.execute(createStatments[i]);
+                }catch(SQLException e){
+                    if(alreadyExists){
+                        log.debug("Could not create JDBC tables; The message table already existed."+" Failure was: "
+                                +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
+                                +" Vendor code: "+e.getErrorCode());
+                    }else{
+                        log.warn("Could not create JDBC tables; they could already exist."+" Failure was: "
+                                +createStatments[i]+" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()
+                                +" Vendor code: "+e.getErrorCode());
                         JDBCPersistenceAdapter.log("Failure details: ",e);
                     }
                 }
             }
             c.getConnection().commit();
-            
-        }
-        finally {
-            try {
+        }finally{
+            try{
                 s.close();
-            }
-            catch (Throwable e) {
+            }catch(Throwable e){
             }
         }
     }
 
-    public void doDropTables(TransactionContext c) throws SQLException, IOException {
-        Statement s = null;
-        try {
-            s = c.getConnection().createStatement();
-            String[] dropStatments = statements.getDropSchemaStatements();
-            for (int i = 0; i < dropStatments.length; i++) {
+    public void doDropTables(TransactionContext c) throws SQLException,IOException{
+        Statement s=null;
+        try{
+            s=c.getConnection().createStatement();
+            String[] dropStatments=statements.getDropSchemaStatements();
+            for(int i=0;i<dropStatments.length;i++){
                 // This will fail usually since the tables will be
                 // created already.
-                try {
-                    boolean rc = s.execute(dropStatments[i]);
-                }
-                catch (SQLException e) {
-                    log.warn("Could not drop JDBC tables; they may not exist." +
-                        " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() +
-                        " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
+                try{
+                    boolean rc=s.execute(dropStatments[i]);
+                }catch(SQLException e){
+                    log.warn("Could not drop JDBC tables; they may not exist."+" Failure was: "+dropStatments[i]
+                            +" Message: "+e.getMessage()+" SQLState: "+e.getSQLState()+" Vendor code: "
+                            +e.getErrorCode());
                     JDBCPersistenceAdapter.log("Failure details: ",e);
                 }
             }
             c.getConnection().commit();
-        }
-        finally {
-            try {
+        }finally{
+            try{
                 s.close();
-            }
-            catch (Throwable e) {
+            }catch(Throwable e){
             }
         }
     }
-    public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-            s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
-            rs = s.executeQuery();
-            long seq1 = 0;
-            if (rs.next()) {
-                seq1 = rs.getLong(1);
+
+    public long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+            rs=s.executeQuery();
+            long seq1=0;
+            if(rs.next()){
+                seq1=rs.getLong(1);
             }
             rs.close();
             s.close();
-            s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
-            rs = s.executeQuery();
-            long seq2 = 0;
-            if (rs.next()) {
-                seq2 = rs.getLong(1);
+            s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInAcksStatement());
+            rs=s.executeQuery();
+            long seq2=0;
+            if(rs.next()){
+                seq2=rs.getLong(1);
             }
-            
-            return Math.max(seq1, seq2);
-        }
-        finally {
+            return Math.max(seq1,seq2);
+        }finally{
             close(rs);
             close(s);
         }
     }
 
-    public void doAddMessage(TransactionContext c, MessageId messageID, ActiveMQDestination destination, byte[] data, long expiration) throws SQLException, IOException {
-        PreparedStatement s = c.getAddMessageStatement();
-        try {
-            if( s == null ) {
-                s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
-                if( batchStatments ) {
+    public void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,byte[] data,
+            long expiration) throws SQLException,IOException{
+        PreparedStatement s=c.getAddMessageStatement();
+        try{
+            if(s==null){
+                s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
+                if(batchStatments){
                     c.setAddMessageStatement(s);
                 }
             }
-            s.setLong(1, messageID.getBrokerSequenceId());
-            s.setString(2, messageID.getProducerId().toString());
-            s.setLong(3, messageID.getProducerSequenceId());
-            s.setString(4, destination.getQualifiedName());
-            s.setLong(5, expiration);
-            setBinaryData(s, 6, data);
-            if( batchStatments ) {
+            s.setLong(1,messageID.getBrokerSequenceId());
+            s.setString(2,messageID.getProducerId().toString());
+            s.setLong(3,messageID.getProducerSequenceId());
+            s.setString(4,destination.getQualifiedName());
+            s.setLong(5,expiration);
+            setBinaryData(s,6,data);
+            if(batchStatments){
                 s.addBatch();
-            } else if ( s.executeUpdate() != 1 ) {
+            }else if(s.executeUpdate()!=1){
                 throw new SQLException("Failed add a message");
             }
-        } finally {
-            if( !batchStatments ) {
+        }finally{
+            if(!batchStatments){
                 s.close();
             }
         }
     }
-    
-    public void doAddMessageReference(TransactionContext c, MessageId messageID, ActiveMQDestination destination, long expirationTime, String messageRef) throws SQLException, IOException {
-        PreparedStatement s = c.getAddMessageStatement();
-        try {
-            if( s == null ) {
-                s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
-                if( batchStatments ) {
+
+    public void doAddMessageReference(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
+            long expirationTime,String messageRef) throws SQLException,IOException{
+        PreparedStatement s=c.getAddMessageStatement();
+        try{
+            if(s==null){
+                s=c.getConnection().prepareStatement(statements.getAddMessageStatement());
+                if(batchStatments){
                     c.setAddMessageStatement(s);
                 }
             }
-            s.setLong(1, messageID.getBrokerSequenceId());
-            s.setString(2, messageID.getProducerId().toString());
-            s.setLong(3, messageID.getProducerSequenceId());
-            s.setString(4, destination.getQualifiedName());
-            s.setLong(5, expirationTime);
-            s.setString(6, messageRef);
-            if( batchStatments ) {
+            s.setLong(1,messageID.getBrokerSequenceId());
+            s.setString(2,messageID.getProducerId().toString());
+            s.setLong(3,messageID.getProducerSequenceId());
+            s.setString(4,destination.getQualifiedName());
+            s.setLong(5,expirationTime);
+            s.setString(6,messageRef);
+            if(batchStatments){
                 s.addBatch();
-            } else if ( s.executeUpdate() != 1 ) {
+            }else if(s.executeUpdate()!=1){
                 throw new SQLException("Failed add a message");
             }
-        } finally {
-            if( !batchStatments ) {
+        }finally{
+            if(!batchStatments){
                 s.close();
             }
         }
     }
 
-    public long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
-            s.setString(1, messageID.getProducerId().toString());
-            s.setLong(2, messageID.getProducerSequenceId());
-            rs = s.executeQuery();
-
-            if (!rs.next()) {
+    public long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindMessageSequenceIdStatement());
+            s.setString(1,messageID.getProducerId().toString());
+            s.setLong(2,messageID.getProducerSequenceId());
+            rs=s.executeQuery();
+            if(!rs.next()){
                 return 0;
             }
             return rs.getLong(1);
-
-        }
-        finally {
+        }finally{
             close(rs);
             close(s);
         }
-	}
-
-    public byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
-            s.setLong(1, seq);
-            rs = s.executeQuery();
+    }
 
-            if (!rs.next()) {
+    public byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
+            s.setLong(1,seq);
+            rs=s.executeQuery();
+            if(!rs.next()){
                 return null;
             }
-            return getBinaryData(rs, 1);
-
-        }
-        finally {
+            return getBinaryData(rs,1);
+        }finally{
             close(rs);
             close(s);
         }
     }
-    
-    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
 
-            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
-            s.setLong(1, seq);
-            rs = s.executeQuery();
-
-            if (!rs.next()) {
+    public String doGetMessageReference(TransactionContext c,long seq) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindMessageStatement());
+            s.setLong(1,seq);
+            rs=s.executeQuery();
+            if(!rs.next()){
                 return null;
             }
             return rs.getString(1);
-
-        }
-        finally {
+        }finally{
             close(rs);
             close(s);
         }
     }
 
-    public void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException {
-
-        PreparedStatement s = c.getRemovedMessageStatement();
-        try {
-            if( s == null ) {
-                s = c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
-                if( batchStatments ) {
+    public void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException{
+        PreparedStatement s=c.getRemovedMessageStatement();
+        try{
+            if(s==null){
+                s=c.getConnection().prepareStatement(statements.getRemoveMessageStatment());
+                if(batchStatments){
                     c.setRemovedMessageStatement(s);
                 }
             }
-            s.setLong(1, seq);
-            
-            if( batchStatments ) {
+            s.setLong(1,seq);
+            if(batchStatments){
                 s.addBatch();
-            } else if ( s.executeUpdate() != 1 ) {
+            }else if(s.executeUpdate()!=1){
                 throw new SQLException("Failed to remove message");
             }
-        } finally {
-            if( !batchStatments ) {
+        }finally{
+            if(!batchStatments){
                 s.close();
             }
-        }        
+        }
     }
-    
-    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) throws Exception {
-//        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
-
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
-            s.setString(1, destination.getQualifiedName());
-            rs = s.executeQuery();
 
-            if( statements.isUseExternalMessageReferences() ) {
-                while (rs.next()) {
+    public void doRecover(TransactionContext c,ActiveMQDestination destination,JDBCMessageRecoveryListener listener)
+            throws Exception{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindAllMessagesStatement());
+            s.setString(1,destination.getQualifiedName());
+            rs=s.executeQuery();
+            if(statements.isUseExternalMessageReferences()){
+                while(rs.next()){
                     listener.recoverMessageReference(rs.getString(2));
                 }
-            } else {
-                while (rs.next()) {
-                    listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
+            }else{
+                while(rs.next()){
+                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
                 }
             }
-        }
-        finally {
+        }finally{
             close(rs);
             close(s);
             listener.finished();
-        }     
+        }
     }
 
-    
-    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq) throws SQLException, IOException {
-        
-        PreparedStatement s = c.getAddMessageStatement();
-        try {
-            if( s == null ) {
-                s = c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
-                if( batchStatments ) {
+    public void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,long seq) throws SQLException,IOException{
+        PreparedStatement s=c.getAddMessageStatement();
+        try{
+            if(s==null){
+                s=c.getConnection().prepareStatement(statements.getUpdateLastAckOfDurableSubStatement());
+                if(batchStatments){
                     c.setUpdateLastAckStatement(s);
                 }
             }
-            
-            s.setLong(1, seq);
-            s.setString(2, destination.getQualifiedName());
-            s.setString(3, clientId);
-            s.setString(4, subscriptionName);
-
-            if( batchStatments ) {
+            s.setLong(1,seq);
+            s.setString(2,destination.getQualifiedName());
+            s.setString(3,clientId);
+            s.setString(4,subscriptionName);
+            if(batchStatments){
                 s.addBatch();
-            } else if ( s.executeUpdate() != 1 ) {
+            }else if(s.executeUpdate()!=1){
                 throw new SQLException("Failed add a message");
             }
-        } finally {
-            if( !batchStatments ) {
+        }finally{
+            if(!batchStatments){
                 s.close();
             }
         }
-
     }
 
-    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
-//        dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
-
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriptionName);
-            rs = s.executeQuery();
-
-            if( statements.isUseExternalMessageReferences() ) {
-                while (rs.next()) {
+    public void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception{
+        // dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindAllDurableSubMessagesStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
+            rs=s.executeQuery();
+            if(statements.isUseExternalMessageReferences()){
+                while(rs.next()){
                     listener.recoverMessageReference(rs.getString(2));
                 }
-            } else {
-                while (rs.next()) {
-                    listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
+            }else{
+                while(rs.next()){
+                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
                 }
             }
-
+        }finally{
+            close(rs);
+            close(s);
+            listener.finished();
         }
-        finally {
+    }
+
+    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
+            s.setLong(4,seq);
+            rs=s.executeQuery();
+            int count=0;
+            if(statements.isUseExternalMessageReferences()){
+                while(rs.next()&&count<maxReturned){
+                    listener.recoverMessageReference(rs.getString(1));
+                    count++;
+                }
+            }else{
+                while(rs.next()&&count<maxReturned){
+                    listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+                    count++;
+                }
+            }
+        }finally{
             close(rs);
             close(s);
             listener.finished();
         }
-        
     }
-    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, int maxReturned,JDBCMessageRecoveryListener listener) throws Exception {
-//      dumpTables(c, destination.getQualifiedName(),clientId,subscriptionName);
 
-      PreparedStatement s = null;
-      ResultSet rs = null;
-      try {
-          System.err.println("VANILLA STATEMENT = " + statements.getFindDurableSubMessagesStatement());
-          s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
-          s.setString(1, destination.getQualifiedName());
-          s.setString(2, clientId);
-          s.setString(3, subscriptionName);
-          s.setLong(4,seq);
-          s.setInt(5,maxReturned);
-          System.err.println("STATEMENT = " + s);
-          rs = s.executeQuery();
-
-          if( statements.isUseExternalMessageReferences() ) {
-              while (rs.next()) {
-                  listener.recoverMessageReference(rs.getString(2));
-              }
-          } else {
-              while (rs.next()) {
-                  listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2));
-              }
-          }
-
-      }
-      finally {
-          close(rs);
-          close(s);
-          listener.finished();
-      }
-      
-  }
-    
     public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
-                    String subscriptionName) throws SQLException, IOException{
+            String subscriptionName) throws SQLException,IOException{
         PreparedStatement s=null;
         ResultSet rs=null;
-        int result = 0;
+        int result=0;
         try{
             s=c.getConnection().prepareStatement(statements.getDurableSubscriberMessageCountStatement());
             s.setString(1,destination.getQualifiedName());
             s.setString(2,clientId);
             s.setString(3,subscriptionName);
             rs=s.executeQuery();
-            result =  rs.getInt(1);
+            if(rs.next()){
+                result=rs.getInt(1);
+            }
         }finally{
             close(rs);
             close(s);
@@ -463,278 +416,288 @@
     }
 
     /**
-     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.apache.activemq.service.SubscriptionInfo)
+     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object,
+     *      org.apache.activemq.service.SubscriptionInfo)
      */
-    
-    public void doSetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, String selector, boolean retroactive) throws SQLException, IOException {
-
-//        dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
-        
-        PreparedStatement s = null;
-        try {
-            
-            long lastMessageId = -1;
-            if(!retroactive) {
-                s = c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
+    public void doSetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName,String selector,boolean retroactive) throws SQLException,IOException{
+        // dumpTables(c, destination.getQualifiedName(), clientId, subscriptionName);
+        PreparedStatement s=null;
+        try{
+            long lastMessageId=-1;
+            if(!retroactive){
+                s=c.getConnection().prepareStatement(statements.getFindLastSequenceIdInMsgsStatement());
                 ResultSet rs=null;
-                try {
-                    rs = s.executeQuery();
-                    if (rs.next()) {
-                        lastMessageId = rs.getLong(1);
+                try{
+                    rs=s.executeQuery();
+                    if(rs.next()){
+                        lastMessageId=rs.getLong(1);
                     }
-                } finally {
+                }finally{
                     close(rs);
                     close(s);
                 }
             }
-            
-            s = c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriptionName);
-            s.setString(4, selector);
-            s.setLong(5, lastMessageId);
-
-            if (s.executeUpdate() != 1) {
+            s=c.getConnection().prepareStatement(statements.getCreateDurableSubStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
+            s.setString(4,selector);
+            s.setLong(5,lastMessageId);
+            if(s.executeUpdate()!=1){
                 throw new IOException("Could not create durable subscription for: "+clientId);
             }
-        }
-        finally {
+        }finally{
             close(s);
         }
     }
 
-    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException {
-
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriptionName);
-            rs = s.executeQuery();
-
-            if (!rs.next()) {
+    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindDurableSubStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
+            rs=s.executeQuery();
+            if(!rs.next()){
                 return null;
             }
-
-            SubscriptionInfo subscription = new SubscriptionInfo();
+            SubscriptionInfo subscription=new SubscriptionInfo();
             subscription.setDestination(destination);
             subscription.setClientId(clientId);
             subscription.setSubcriptionName(subscriptionName);
             subscription.setSelector(rs.getString(1));
             return subscription;
-            
-        }
-        finally {
+        }finally{
             close(rs);
             close(s);
         }
     }
 
-    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
-            s.setString(1, destination.getQualifiedName());
-            rs = s.executeQuery();
-
-            ArrayList rc = new ArrayList();
-            while(rs.next()) {
-                SubscriptionInfo subscription = new SubscriptionInfo();
+    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
+            throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindAllDurableSubsStatement());
+            s.setString(1,destination.getQualifiedName());
+            rs=s.executeQuery();
+            ArrayList rc=new ArrayList();
+            while(rs.next()){
+                SubscriptionInfo subscription=new SubscriptionInfo();
                 subscription.setDestination(destination);
                 subscription.setSelector(rs.getString(1));
                 subscription.setSubcriptionName(rs.getString(2));
                 subscription.setClientId(rs.getString(3));
                 rc.add(subscription);
             }
-
-            return (SubscriptionInfo[]) rc.toArray(new SubscriptionInfo[rc.size()]);            
-        }
-        finally {
+            return (SubscriptionInfo[])rc.toArray(new SubscriptionInfo[rc.size()]);
+        }finally{
             close(rs);
             close(s);
         }
     }
 
-    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, IOException {
-        PreparedStatement s = null;
-        try {
-            s = c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
-            s.setString(1, destinationName.getQualifiedName());
+    public void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName) throws SQLException,
+            IOException{
+        PreparedStatement s=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getRemoveAllMessagesStatement());
+            s.setString(1,destinationName.getQualifiedName());
             s.executeUpdate();
             s.close();
-            
-            s = c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
-            s.setString(1, destinationName.getQualifiedName());
+            s=c.getConnection().prepareStatement(statements.getRemoveAllSubscriptionsStatement());
+            s.setString(1,destinationName.getQualifiedName());
             s.executeUpdate();
-            
-        }
-        finally {
+        }finally{
             close(s);
         }
     }
-    
-    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException {
-        PreparedStatement s = null;
-        try {
-            s = c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriptionName);
+
+    public void doDeleteSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
+            String subscriptionName) throws SQLException,IOException{
+        PreparedStatement s=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getDeleteSubscriptionStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriptionName);
             s.executeUpdate();
-        }
-        finally {
+        }finally{
             close(s);
         }
     }
 
-    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
-        PreparedStatement s = null;
-        try {
+    public void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException{
+        PreparedStatement s=null;
+        try{
             log.debug("Executing SQL: "+statements.getDeleteOldMessagesStatement());
-            s = c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
-            s.setLong(1, System.currentTimeMillis());
-            int i = s.executeUpdate();
+            s=c.getConnection().prepareStatement(statements.getDeleteOldMessagesStatement());
+            s.setLong(1,System.currentTimeMillis());
+            int i=s.executeUpdate();
             log.debug("Deleted "+i+" old message(s).");
-        }
-        finally {
+        }finally{
             close(s);
         }
     }
 
-    static private void close(PreparedStatement s) {
-        try {
+    static private void close(PreparedStatement s){
+        try{
             s.close();
-        }
-        catch (Throwable e) {
+        }catch(Throwable e){
         }
     }
 
-    static private void close(ResultSet rs) {
-        try {
+    static private void close(ResultSet rs){
+        try{
             rs.close();
-        }
-        catch (Throwable e) {
+        }catch(Throwable e){
         }
     }
 
-    public Set doGetDestinations(TransactionContext c) throws SQLException, IOException {
-        HashSet rc = new HashSet();
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-            s = c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
-            rs = s.executeQuery();
-
-            while (rs.next()) {
-                rc.add( ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
+    public Set doGetDestinations(TransactionContext c) throws SQLException,IOException{
+        HashSet rc=new HashSet();
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getFindAllDestinationsStatement());
+            rs=s.executeQuery();
+            while(rs.next()){
+                rc.add(ActiveMQDestination.createDestination(rs.getString(1),ActiveMQDestination.QUEUE_TYPE));
             }
-        }
-        finally {
+        }finally{
             close(rs);
             close(s);
         }
         return rc;
     }
 
-    public boolean isBatchStatments() {
+    public boolean isBatchStatments(){
         return batchStatments;
     }
 
-    public void setBatchStatments(boolean batchStatments) {
-        this.batchStatments = batchStatments;
+    public void setBatchStatments(boolean batchStatments){
+        this.batchStatments=batchStatments;
     }
 
-    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
+    public void setUseExternalMessageReferences(boolean useExternalMessageReferences){
         statements.setUseExternalMessageReferences(useExternalMessageReferences);
     }
 
-    public Statements getStatements() {
+    public Statements getStatements(){
         return statements;
     }
 
-    public void setStatements(Statements statements) {
-        this.statements = statements;
+    public void setStatements(Statements statements){
+        this.statements=statements;
     }
 
-    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,String clientId,String subscriberName) throws SQLException,IOException{
-        PreparedStatement s = null;
-        ResultSet rs = null;
-        try {
-
-            s = c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
-            s.setString(1, destination.getQualifiedName());
-            s.setString(2, clientId);
-            s.setString(3, subscriberName);
-            rs = s.executeQuery();
-
-            if (!rs.next()) {
+    public byte[] doGetNextDurableSubscriberMessageStatement(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriberName) throws SQLException,IOException{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setString(2,clientId);
+            s.setString(3,subscriberName);
+            rs=s.executeQuery();
+            if(!rs.next()){
                 return null;
             }
-            return getBinaryData(rs, 1);
-
+            return getBinaryData(rs,1);
+        }finally{
+            close(rs);
+            close(s);
         }
-        finally {
+    }
+
+    /**
+     * @param c
+     * @param destination
+     * @param clientId
+     * @param subscriberName
+     * @param id
+     * @return the previous Id
+     * @throws Exception 
+     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetPrevDurableSubscriberMessageStatement(org.apache.activemq.store.jdbc.TransactionContext,
+     *      org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
+     */
+    public void doGetPrevDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getPrevDurableSubscriberMessageIdStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setLong(2,id);
+            rs=s.executeQuery();
+            if (rs.next()) {
+            listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+            }
+            listener.finished();
+           
+        }finally{
             close(rs);
             close(s);
         }
     }
 
-    /*
-     * Useful for debugging.
-    public void dumpTables(Connection c, String destinationName, String clientId, String subscriptionName) throws SQLException {        
-        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
-        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
-        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
-        +"ACTIVEMQ_MSGS M, "
-        +"ACTIVEMQ_ACKS D "
-        +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
-        +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
-        +" ORDER BY M.ID");
-        s.setString(1,destinationName);
-        s.setString(2,clientId);
-        s.setString(3,subscriptionName);
-        printQuery(s,System.out);
-    }
-    
-    public void dumpTables(Connection c) throws SQLException {
-        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
-        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
-    }
-
-    private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
-        printQuery(c.prepareStatement(query), out);
-    }
-    
-    private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
-        
-        ResultSet set=null;
-        try {
-            set = s.executeQuery();
-            ResultSetMetaData metaData = set.getMetaData();
-            for( int i=1; i<= metaData.getColumnCount(); i++ ) {
-                if(i==1)
-                    out.print("||");
-                out.print(metaData.getColumnName(i)+"||");
-            }
-            out.println();
-            while(set.next()) {
-                for( int i=1; i<= metaData.getColumnCount(); i++ ) {
-                    if(i==1)
-                        out.print("|");
-                    out.print(set.getString(i)+"|");
-                }
-                out.println();
-            }
-        } finally {
-            try { set.close(); } catch (Throwable ignore) {}
-            try { s.close(); } catch (Throwable ignore) {}
+    /**
+     * @param c
+     * @param destination
+     * @param clientId
+     * @param subscriberName
+     * @param id
+     * @return the next id
+     * @throws SQLException
+     * @throws IOException
+     * @see org.apache.activemq.store.jdbc.JDBCAdapter#doGetNextDurableSubscriberMessageIdStatement(org.apache.activemq.store.jdbc.TransactionContext,
+     *      org.apache.activemq.command.ActiveMQDestination, java.lang.String, java.lang.String, java.lang.String)
+     */
+    public void doGetNextDurableSubscriberMessageIdStatement(TransactionContext c,ActiveMQDestination destination,
+            String clientId,String subscriberName,long id,JDBCMessageRecoveryListener listener) throws Exception{
+        PreparedStatement s=null;
+        ResultSet rs=null;
+        try{
+            s=c.getConnection().prepareStatement(statements.getNextDurableSubscriberMessageIdStatement());
+            s.setString(1,destination.getQualifiedName());
+            s.setLong(2,id);
+            rs=s.executeQuery();
+            if (rs.next()) {
+            listener.recoverMessage(rs.getLong(1),getBinaryData(rs,2));
+            }
+            listener.finished();
+           
+        }finally{
+            close(rs);
+            close(s);
         }
     }
-    */
+    /*
+     * Useful for debugging. public void dumpTables(Connection c, String destinationName, String clientId, String
+     * subscriptionName) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); printQuery(c,
+     * "Select * from ACTIVEMQ_ACKS", System.out); PreparedStatement s = c.prepareStatement("SELECT M.ID,
+     * D.LAST_ACKED_ID FROM " +"ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " +"WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND
+     * D.SUB_NAME=?" +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" +" ORDER BY M.ID");
+     * s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
+     * printQuery(s,System.out); }
+     * 
+     * public void dumpTables(Connection c) throws SQLException { printQuery(c, "Select * from ACTIVEMQ_MSGS",
+     * System.out); printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); }
+     * 
+     * private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
+     * printQuery(c.prepareStatement(query), out); }
+     * 
+     * private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
+     * 
+     * ResultSet set=null; try { set = s.executeQuery(); ResultSetMetaData metaData = set.getMetaData(); for( int i=1; i<=
+     * metaData.getColumnCount(); i++ ) { if(i==1) out.print("||"); out.print(metaData.getColumnName(i)+"||"); }
+     * out.println(); while(set.next()) { for( int i=1; i<= metaData.getColumnCount(); i++ ) { if(i==1) out.print("|");
+     * out.print(set.getString(i)+"|"); } out.println(); } } finally { try { set.close(); } catch (Throwable ignore) {}
+     * try { s.close(); } catch (Throwable ignore) {} } }
+     */
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -190,14 +190,23 @@
         return longTermStore.getAllSubscriptions();
     }
 
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
         this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+        return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
+    }
+    
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId,subscriberName);
+    }
+    
+    public void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch) {
+        longTermStore.resetBatching(clientId,subscriptionName,nextToDispatch);
     }
 
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -217,14 +217,24 @@
         return longTermStore.getAllSubscriptions();
     }
     
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
         this.peristenceAdapter.checkpoint(true, true);
-        return longTermStore.getNextMessageToDeliver(clientId,subscriptionName);
+        return longTermStore.getNextMessageIdToDeliver(clientId,subscriptionName,id);
     }
     
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws Exception{
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getPreviousMessageIdToDeliver(clientId,subscriptionName,id);
+    }
+
+    
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId,subscriberName);
+    }
+    
+    public void resetBatching(String clientId,String subscriptionName,MessageId nextId) {
+        longTermStore.resetBatching(clientId,subscriptionName,nextId);
     }
 
    

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRef.java Mon Oct  9 06:05:20 2006
@@ -0,0 +1,58 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import org.apache.activemq.kaha.StoreEntry;
+
+/**
+ * Holds information for location of message
+ * 
+ * @version $Revision: 1.10 $
+ */
+public class ConsumerMessageRef{
+
+    private StoreEntry messageEntry;
+    private StoreEntry ackEntry;
+    
+    /**
+     * @return the ackEntry
+     */
+    public StoreEntry getAckEntry(){
+        return this.ackEntry;
+    }
+    
+    /**
+     * @param ackEntry the ackEntry to set
+     */
+    public void setAckEntry(StoreEntry ackEntry){
+        this.ackEntry=ackEntry;
+    }
+    
+    /**
+     * @return the messageEntry
+     */
+    public StoreEntry getMessageEntry(){
+        return this.messageEntry;
+    }
+    
+    /**
+     * @param messageEntry the messageEntry to set
+     */
+    public void setMessageEntry(StoreEntry messageEntry){
+        this.messageEntry=messageEntry;
+    }
+
+       
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/ConsumerMessageRefMarshaller.java Mon Oct  9 06:05:20 2006
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadaptor;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.impl.index.IndexItem;
+
+
+/**
+ * Marshall a TopicSubAck
+ * @version $Revision: 1.10 $
+ */
+public class ConsumerMessageRefMarshaller implements Marshaller{
+   
+
+    /**
+     * @param object
+     * @param dataOut
+     * @throws IOException
+     * @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
+     */
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
+       ConsumerMessageRef ref = (ConsumerMessageRef) object;
+       IndexItem item = (IndexItem)ref.getMessageEntry();
+       dataOut.writeLong(item.getOffset());
+       item.write(dataOut);
+       item = (IndexItem)ref.getAckEntry();
+       dataOut.writeLong(item.getOffset());
+       item.write(dataOut);
+       
+    }
+
+    /**
+     * @param dataIn
+     * @return payload
+     * @throws IOException
+     * @see org.apache.activemq.kaha.Marshaller#readPayload(java.io.DataInput)
+     */
+    public Object readPayload(DataInput dataIn) throws IOException{
+        ConsumerMessageRef ref = new ConsumerMessageRef();
+        IndexItem item = new IndexItem();
+        item.setOffset(dataIn.readLong());
+        item.read(dataIn);
+        ref.setMessageEntry(item);
+        item = new IndexItem();
+        item.setOffset(dataIn.readLong());
+        item.read(dataIn);
+        ref.setAckEntry(item);
+        return ref;
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Mon Oct  9 06:05:20 2006
@@ -1,27 +1,23 @@
 /**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+
 package org.apache.activemq.store.kahadaptor;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -32,68 +28,69 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.StringMarshaller;
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * @version $Revision: 1.5 $
  */
-public class KahaTopicMessageStore  implements TopicMessageStore{
+public class KahaTopicMessageStore implements TopicMessageStore{
+
     private ActiveMQDestination destination;
     private ListContainer ackContainer;
     private ListContainer messageContainer;
     private Map subscriberContainer;
     private Store store;
-    private Map subscriberAcks=new ConcurrentHashMap();
+    private Map subscriberMessages=new ConcurrentHashMap();
 
     public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
-                    MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        this.messageContainer = messageContainer;
-        this.destination = destination;
+            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
+        this.messageContainer=messageContainer;
+        this.destination=destination;
         this.store=store;
         this.ackContainer=ackContainer;
         subscriberContainer=subsContainer;
         // load all the Ack containers
         for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
             Object key=i.next();
-            addSubscriberAckContainer(key);
+            addSubscriberMessageContainer(key);
         }
     }
 
     public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        int subscriberCount=subscriberAcks.size();
+        int subscriberCount=subscriberMessages.size();
         if(subscriberCount>0){
-            StoreEntry entry = messageContainer.placeLast(message);
-            TopicSubAck tsa = new TopicSubAck();
+            StoreEntry messageEntry=messageContainer.placeLast(message);
+            TopicSubAck tsa=new TopicSubAck();
             tsa.setCount(subscriberCount);
-            tsa.setStoreEntry(entry);
-            StoreEntry ackEntry = ackContainer.placeLast(tsa);
-            for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
-                Object key=i.next();
-                ListContainer container=store.getListContainer(key,"durable-subs");
-                container.add(ackEntry);
+            tsa.setMessageEntry(messageEntry);
+            StoreEntry ackEntry=ackContainer.placeLast(tsa);
+            for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+                TopicSubContainer container=(TopicSubContainer)i.next();
+                ConsumerMessageRef ref=new ConsumerMessageRef();
+                ref.setAckEntry(ackEntry);
+                ref.setMessageEntry(messageEntry);
+                container.getListContainer().add(ref);
             }
-            
         }
     }
 
     public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
             MessageId messageId) throws IOException{
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
         if(container!=null){
-            StoreEntry ackEntry=(StoreEntry)container.removeFirst();
-            if(ackEntry!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
+            ConsumerMessageRef ref=(ConsumerMessageRef)container.getListContainer().removeFirst();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ackEntry);
-                        messageContainer.remove(tsa.getStoreEntry());
-                    }else {
-                       ackContainer.update(ackEntry,tsa);
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
                     }
                 }
             }
@@ -101,11 +98,11 @@
     }
 
     public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
     }
 
     public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
-                    throws IOException{
+            throws IOException{
         SubscriptionInfo info=new SubscriptionInfo();
         info.setDestination(destination);
         info.setClientId(clientId);
@@ -117,23 +114,23 @@
         if(!subscriberContainer.containsKey(key)){
             subscriberContainer.put(key,info);
         }
-        addSubscriberAckContainer(key);
+        addSubscriberMessageContainer(key);
     }
 
     public synchronized void deleteSubscription(String clientId,String subscriptionName){
         String key=getSubscriptionKey(clientId,subscriptionName);
         subscriberContainer.remove(key);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        for(Iterator i=list.iterator();i.hasNext();){
-            StoreEntry ackEntry=(StoreEntry)i.next();
-            if(ackEntry!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+            if(ref!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
                 if(tsa!=null){
                     if(tsa.decrementCount()<=0){
-                        ackContainer.remove(ackEntry);
-                        messageContainer.remove(tsa.getStoreEntry());
-                    }else {
-                       ackContainer.update(ackEntry,tsa);
+                        ackContainer.remove(ref.getAckEntry());
+                        messageContainer.remove(tsa.getMessageEntry());
+                    }else{
+                        ackContainer.update(ref.getAckEntry(),tsa);
                     }
                 }
             }
@@ -141,18 +138,18 @@
     }
 
     public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-                    throws Exception{
+            throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        if(list!=null){
-            for(Iterator i=list.iterator();i.hasNext();){
-                StoreEntry entry = (StoreEntry)i.next();
-                Object msg=messageContainer.get(entry);
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            for(Iterator i=container.getListContainer().iterator();i.hasNext();){
+                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
+                Object msg=messageContainer.get(ref.getMessageEntry());
                 if(msg!=null){
                     if(msg.getClass()==String.class){
-                        listener.recoverMessageReference((String) msg);
+                        listener.recoverMessageReference((String)msg);
                     }else{
-                        listener.recoverMessage((Message) msg);
+                        listener.recoverMessage((Message)msg);
                     }
                 }
                 listener.finished();
@@ -161,42 +158,40 @@
             listener.finished();
         }
     }
-    
+
     public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
-                    MessageRecoveryListener listener) throws Exception{
+            MessageRecoveryListener listener) throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        if(list!=null){
-            boolean startFound=false;
-            int count = 0;
-            for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
-                StoreEntry entry = (StoreEntry)i.next();
-                Object msg=messageContainer.get(entry);
-                if(msg!=null){
-                    if(msg.getClass()==String.class){
-                        String ref=msg.toString();
-                        if (startFound || lastMessageId == null){
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        if(container!=null){
+            int count=0;
+            StoreEntry entry=container.getBatchEntry();
+            if(entry==null){
+                entry=container.getListContainer().getFirst();
+            }else{
+                entry=container.getListContainer().refresh(entry);
+                entry=container.getListContainer().getNext(entry);
+            }
+            if(entry!=null){
+                do{
+                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)container.getListContainer().get(entry);
+                    Object msg=messageContainer.get(consumerRef.getMessageEntry());
+                    if(msg!=null){
+                        if(msg.getClass()==String.class){
+                            String ref=msg.toString();
                             listener.recoverMessageReference(ref);
-                            count++;
-                        }
-                        else if(startFound||ref.equals(lastMessageId.toString())){
-                            startFound=true;
-                        }
-                    }else{
-                        Message message=(Message) msg;
-                        if(startFound||message.getMessageId().equals(lastMessageId)){
-                            startFound=true;
                         }else{
+                            Message message=(Message)msg;
                             listener.recoverMessage(message);
-                            count++;
                         }
+                        count++;
                     }
-                }
-                listener.finished();
+                    container.setBatchEntry(entry);
+                    entry=container.getListContainer().getNext(entry);
+                }while(entry!=null&&count<maxReturned);
             }
-        }else{
-            listener.finished();
         }
+        listener.finished();
     }
 
     public void delete(){
@@ -206,8 +201,8 @@
     }
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException{
-        return (SubscriptionInfo[]) subscriberContainer.values().toArray(
-                        new SubscriptionInfo[subscriberContainer.size()]);
+        return (SubscriptionInfo[])subscriberContainer.values().toArray(
+                new SubscriptionInfo[subscriberContainer.size()]);
     }
 
     protected String getSubscriptionKey(String clientId,String subscriberName){
@@ -216,25 +211,18 @@
         return result;
     }
 
-    protected void addSubscriberAckContainer(Object key) throws IOException{
+    protected void addSubscriberMessageContainer(Object key) throws IOException{
         ListContainer container=store.getListContainer(key,"topic-subs");
-        Marshaller marshaller=new StoreEntryMarshaller();
+        Marshaller marshaller=new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
-        subscriberAcks.put(key,container);
-    }
-
-    public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        StoreEntry entry = (StoreEntry)list.get(0);
-        Message msg=(Message)messageContainer.get(entry);
-        return msg;
+        TopicSubContainer tsc=new TopicSubContainer(container);
+        subscriberMessages.put(key,tsc);
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
-        ListContainer list=(ListContainer) subscriberAcks.get(key);
-        return list.size();
+        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
+        return container.getListContainer().size();
     }
 
     /**
@@ -243,11 +231,12 @@
      * @param expirationTime
      * @param messageRef
      * @throws IOException
-     * @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageId, long, java.lang.String)
+     * @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
+     *      org.apache.activemq.command.MessageId, long, java.lang.String)
      */
-    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
+            throws IOException{
         messageContainer.add(messageRef);
-        
     }
 
     /**
@@ -255,7 +244,7 @@
      * @see org.apache.activemq.store.MessageStore#getDestination()
      */
     public ActiveMQDestination getDestination(){
-       return destination;
+        return destination;
     }
 
     /**
@@ -265,11 +254,11 @@
      * @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
      */
     public Message getMessage(MessageId identity) throws IOException{
-        Message result = null;
-        for (Iterator i = messageContainer.iterator(); i.hasNext();){
-            Message msg = (Message)i.next();
-            if (msg.getMessageId().equals(identity)) {
-                result = msg;
+        Message result=null;
+        for(Iterator i=messageContainer.iterator();i.hasNext();){
+            Message msg=(Message)i.next();
+            if(msg.getMessageId().equals(identity)){
+                result=msg;
                 break;
             }
         }
@@ -294,13 +283,12 @@
         for(Iterator iter=messageContainer.iterator();iter.hasNext();){
             Object msg=iter.next();
             if(msg.getClass()==String.class){
-                listener.recoverMessageReference((String) msg);
+                listener.recoverMessageReference((String)msg);
             }else{
-                listener.recoverMessage((Message) msg);
+                listener.recoverMessage((Message)msg);
             }
         }
         listener.finished();
-        
     }
 
     /**
@@ -308,26 +296,30 @@
      * @throws IOException
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
      */
-    public void removeAllMessages(ConnectionContext context) throws IOException{
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
         messageContainer.clear();
-        
+        ackContainer.clear();
+        for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
+            TopicSubContainer container=(TopicSubContainer)i.next();
+            container.getListContainer().clear();
+        }
     }
 
     /**
      * @param context
      * @param ack
      * @throws IOException
-     * @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageAck)
+     * @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext,
+     *      org.apache.activemq.command.MessageAck)
      */
     public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
-        for (Iterator i = messageContainer.iterator(); i.hasNext();){
-            Message msg = (Message)i.next();
-            if (msg.getMessageId().equals(ack.getLastMessageId())) {
-               i.remove();
+        for(Iterator i=messageContainer.iterator();i.hasNext();){
+            Message msg=(Message)i.next();
+            if(msg.getMessageId().equals(ack.getLastMessageId())){
+                i.remove();
                 break;
             }
         }
-        
     }
 
     /**
@@ -336,7 +328,6 @@
      */
     public void setUsageManager(UsageManager usageManager){
         // TODO Auto-generated method stub
-        
     }
 
     /**
@@ -345,7 +336,6 @@
      */
     public void start() throws Exception{
         // TODO Auto-generated method stub
-        
     }
 
     /**
@@ -354,8 +344,76 @@
      */
     public void stop() throws Exception{
         // TODO Auto-generated method stub
-        
     }
 
-    
+    /**
+     * @param clientId
+     * @param subscriptionName
+     * @see org.apache.activemq.store.TopicMessageStore#resetBatching(java.lang.String, java.lang.String)
+     */
+    public synchronized void resetBatching(String clientId,String subscriptionName,MessageId nextToDispatch){
+        String key=getSubscriptionKey(clientId,subscriptionName);
+        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
+        if(topicSubContainer!=null){
+            topicSubContainer.reset();
+            if(nextToDispatch!=null){
+                StoreEntry entry=topicSubContainer.getListContainer().getFirst();
+                do{
+                    ConsumerMessageRef consumerRef=(ConsumerMessageRef)topicSubContainer.getListContainer().get(entry);
+                    Object msg=messageContainer.get(consumerRef.getMessageEntry());
+                    if(msg!=null){
+                        if(msg.getClass()==String.class){
+                            String ref=msg.toString();
+                            if(msg.toString().equals(nextToDispatch.toString())){
+                                // need to set the entry to the previous one
+                                // to ensure we start in the right place
+                                topicSubContainer
+                                        .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
+                                break;
+                            }
+                        }else{
+                            Message message=(Message)msg;
+                            if(message!=null&&message.getMessageId().equals(nextToDispatch)){
+                                // need to set the entry to the previous one
+                                // to ensure we start in the right place
+                                topicSubContainer
+                                        .setBatchEntry(topicSubContainer.getListContainer().getPrevious(entry));
+                                break;
+                            }
+                        }
+                    }
+                    entry=topicSubContainer.getListContainer().getNext(entry);
+                }while(entry!=null);
+            }
+        }
+    }
+
+    /**
+     * @param clientId
+     * @param subscriptionName
+     * @param id
+     * @return next messageId
+     * @throws IOException
+     * @see org.apache.activemq.store.TopicMessageStore#getNextMessageIdToDeliver(java.lang.String, java.lang.String,
+     *      org.apache.activemq.command.MessageId)
+     */
+    public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /**
+     * @param clientId
+     * @param subscriptionName
+     * @param id
+     * @return previous messageId
+     * @throws IOException
+     * @see org.apache.activemq.store.TopicMessageStore#getPreviousMessageIdToDeliver(java.lang.String,
+     *      java.lang.String, org.apache.activemq.command.MessageId)
+     */
+    public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id)
+            throws IOException{
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAck.java Mon Oct  9 06:05:20 2006
@@ -24,7 +24,7 @@
 public class TopicSubAck{
 
     private int count =0;
-    private StoreEntry storeEntry;
+    private StoreEntry messageEntry;
 
     /**
      * @return the count
@@ -56,18 +56,18 @@
 
     
     /**
-     * @return the storeEntry
+     * @return the messageEntry
      */
-    public StoreEntry getStoreEntry(){
-        return this.storeEntry;
+    public StoreEntry getMessageEntry(){
+        return this.messageEntry;
     }
 
     
     /**
-     * @param storeEntry the storeEntry to set
+     * @param messageEntry the messageEntry to set
      */
-    public void setStoreEntry(StoreEntry storeEntry){
-        this.storeEntry=storeEntry;
+    public void setMessageEntry(StoreEntry storeEntry){
+        this.messageEntry=storeEntry;
     }
 
    

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubAckMarshaller.java Mon Oct  9 06:05:20 2006
@@ -34,7 +34,7 @@
     public void writePayload(Object object,DataOutput dataOut) throws IOException{
        TopicSubAck tsa = (TopicSubAck) object;
        dataOut.writeInt(tsa.getCount());
-       IndexItem item = (IndexItem)tsa.getStoreEntry();
+       IndexItem item = (IndexItem)tsa.getMessageEntry();
        dataOut.writeLong(item.getOffset());
        item.write(dataOut);
        
@@ -47,7 +47,7 @@
         IndexItem item = new IndexItem();
         item.setOffset(dataIn.readLong());
         item.read(dataIn);
-        tsa.setStoreEntry(item);
+        tsa.setMessageEntry(item);
         return tsa;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Mon Oct  9 06:05:20 2006
@@ -0,0 +1,65 @@
+/**
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadaptor;
+
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.StoreEntry;
+
+/**
+ * Holds information for the subscriber
+ * 
+ * @version $Revision: 1.10 $
+ */
+ class TopicSubContainer{
+
+    private ListContainer listContainer;
+    private StoreEntry batchEntry;
+    
+    TopicSubContainer(ListContainer container){
+        this.listContainer = container;
+    }
+    /**
+     * @return the batchEntry
+     */
+     StoreEntry getBatchEntry(){
+        return this.batchEntry;
+    }
+    
+    /**
+     * @param batchEntry the batchEntry to set
+     */
+     void setBatchEntry(StoreEntry batchEntry){
+        this.batchEntry=batchEntry;
+    }
+    
+    /**
+     * @return the listContainer
+     */
+     ListContainer getListContainer(){
+        return this.listContainer;
+    }
+    
+    /**
+     * @param listContainer the listContainer to set
+     */
+     void setListContainer(ListContainer container){
+        this.listContainer=container;
+    }
+    
+     void reset() {
+        batchEntry = null;
+    }
+   
+}



Mime
View raw message