activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r453123 [3/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/kaha/impl/container/ main/java/org/apache/activemq/kaha/impl/data/ ma...
Date Thu, 05 Oct 2006 07:19:37 GMT
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Thu Oct  5 00:19:35 2006
@@ -211,9 +211,9 @@
     public String getFindDurableSubMessagesStatement(){
         if(findDurableSubMessagesStatement==null){
             findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+"
M, "
-                            +getFullAckTableName()+" D "+" WHERE ? >= ( select count(*)
from "
-                            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=?
AND D.SUB_NAME=?"
-                            +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?"+" ORDER BY M.ID)";
+                            +getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*)
FROM "
+                            +getFullMessageTableName()+" M, " +  getFullAckTableName() +
" D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+                            +" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)";
         }
         return findDurableSubMessagesStatement;
     }
@@ -230,9 +230,9 @@
     public String getNextDurableSubscriberMessageStatement(){
         if (nextDurableSubscriberMessageStatement == null){
             nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+"
M, "
-            +getFullAckTableName()+" D "+" WHERE 1 >= ( select count(*) from "
-            +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
-            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+" ORDER BY M.ID)";

+            +getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM "
+            +getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+            +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)";

         }
         return nextDurableSubscriberMessageStatement;
     }
@@ -242,7 +242,7 @@
      */
     public String getDurableSubscriberMessageCountStatement(){
         if (durableSubscriberMessageCountStatement==null){
-            durableSubscriberMessageCountStatement = "select count(*) from "
+            durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
             +getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
             +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Thu Oct  5 00:19:35 2006
@@ -414,13 +414,14 @@
       PreparedStatement s = null;
       ResultSet rs = null;
       try {
-
+          System.err.println("VANILLA STATEMENT = " + statements.getFindDurableSubMessagesStatement());
           s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
           s.setString(1, destination.getQualifiedName());
           s.setString(2, clientId);
           s.setString(3, subscriptionName);
           s.setLong(4,seq);
           s.setInt(5,maxReturned);
+          System.err.println("STATEMENT = " + s);
           rs = s.executeQuery();
 
           if( statements.isUseExternalMessageReferences() ) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Thu Oct  5 00:19:35 2006
@@ -1,20 +1,21 @@
 /**
- *
+ * 
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
+
 package org.apache.activemq.store.kahadaptor;
 
 import java.io.File;
@@ -26,6 +27,8 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.kaha.IndexTypes;
+import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
@@ -39,12 +42,14 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
 /**
  * @org.apache.xbean.XBean
  * 
  * @version $Revision: 1.4 $
  */
 public class KahaPersistenceAdapter implements PersistenceAdapter{
+
     private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
     static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
     KahaTransactionStore transactionStore;
@@ -53,39 +58,39 @@
     ConcurrentHashMap messageStores=new ConcurrentHashMap();
     private boolean useExternalMessageReferences;
     private OpenWireFormat wireFormat=new OpenWireFormat();
-    private long maxDataFileLength = 32 * 1024 * 1024;
-    Store store;
+    private long maxDataFileLength=32*1024*1024;
+    private String indexType=IndexTypes.DISK_INDEX;
+    private File dir;
+    private Store theStore;
 
     public KahaPersistenceAdapter(File dir) throws IOException{
         if(!dir.exists()){
             dir.mkdirs();
         }
-        String name=dir.getAbsolutePath()+File.separator+"kaha.db";
-        store=StoreFactory.open(name,"rw");
-        store.setMaxDataFileLength(maxDataFileLength);
+        this.dir=dir;
     }
 
     public Set getDestinations(){
-        
         Set rc=new HashSet();
-        try {
-        for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
-            Object obj=i.next();
-            if(obj instanceof ActiveMQDestination){
-                rc.add(obj);
+        try{
+            Store store=getStore();
+            for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
+                Object obj=i.next();
+                if(obj instanceof ActiveMQDestination){
+                    rc.add(obj);
+                }
             }
-        }
         }catch(IOException e){
-            log.error("Failed to get destinations " ,e);
+            log.error("Failed to get destinations ",e);
         }
         return rc;
     }
 
     public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws
IOException{
-        MessageStore rc=(MessageStore) queues.get(destination);
+        MessageStore rc=(MessageStore)queues.get(destination);
         if(rc==null){
             rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
-            messageStores.put(destination, rc);
+            messageStores.put(destination,rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);
             }
@@ -95,31 +100,31 @@
     }
 
     public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
throws IOException{
-        TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
+        TopicMessageStore rc=(TopicMessageStore)topics.get(destination);
         if(rc==null){
-            MapContainer messageContainer=getMapContainer(destination,"topic-data");
+            Store store=getStore();
+            ListContainer messageContainer=getListContainer(destination,"topic-data");
             MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
-            MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks");
-            ackContainer.setKeyMarshaller(new StringMarshaller());
-            ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
+            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+            ackContainer.setMarshaller(new TopicSubAckMarshaller());
             rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
-            messageStores.put(destination, rc);
+            messageStores.put(destination,rc);
             if(transactionStore!=null){
                 rc=transactionStore.proxy(rc);
             }
             topics.put(destination,rc);
-            
         }
         return rc;
     }
 
     protected MessageStore retrieveMessageStore(Object id){
-        MessageStore result =  (MessageStore) messageStores.get(id);
+        MessageStore result=(MessageStore)messageStores.get(id);
         return result;
     }
 
     public TransactionStore createTransactionStore() throws IOException{
         if(transactionStore==null){
+            Store store=getStore();
             MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
             container.setKeyMarshaller(new CommandMarshaller(wireFormat));
             container.setValueMarshaller(new TransactionMarshaller(wireFormat));
@@ -129,18 +134,25 @@
         return transactionStore;
     }
 
-    public void beginTransaction(ConnectionContext context){}
+    public void beginTransaction(ConnectionContext context){
+    }
 
     public void commitTransaction(ConnectionContext context) throws IOException{
-        store.force();
+        if(theStore!=null){
+            theStore.force();
+        }
     }
 
-    public void rollbackTransaction(ConnectionContext context){}
+    public void rollbackTransaction(ConnectionContext context){
+    }
 
-    public void start() throws Exception{}
+    public void start() throws Exception{
+    }
 
     public void stop() throws Exception{
-        store.close();
+        if(theStore!=null){
+            theStore.close();
+        }
     }
 
     public long getLastMessageBrokerSequenceId() throws IOException{
@@ -148,8 +160,8 @@
     }
 
     public void deleteAllMessages() throws IOException{
-        if(store!=null){
-            store.delete();
+        if(theStore!=null){
+            theStore.clear();
         }
     }
 
@@ -162,6 +174,7 @@
     }
 
     protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
+        Store store=getStore();
         MapContainer container=store.getMapContainer(id,containerName);
         container.setKeyMarshaller(new StringMarshaller());
         if(useExternalMessageReferences){
@@ -172,12 +185,26 @@
         container.load();
         return container;
     }
+    
+    protected ListContainer getListContainer(Object id,String containerName) throws IOException{
+        Store store=getStore();
+        ListContainer container=store.getListContainer(id,containerName);
+        if(useExternalMessageReferences){
+            container.setMarshaller(new StringMarshaller());
+        }else{
+            container.setMarshaller(new CommandMarshaller(wireFormat));
+        }
+        container.load();
+        return container;
+    }
 
     /**
      * @param usageManager
-     *            The UsageManager that is controlling the broker's memory usage.
+     *            The UsageManager that is controlling the broker's memory
+     *            usage.
      */
-    public void setUsageManager(UsageManager usageManager){}
+    public void setUsageManager(UsageManager usageManager){
+    }
 
     /**
      * @return the maxDataFileLength
@@ -187,11 +214,36 @@
     }
 
     /**
-     * @param maxDataFileLength the maxDataFileLength to set
+     * @param maxDataFileLength
+     *            the maxDataFileLength to set
      * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
     public void setMaxDataFileLength(long maxDataFileLength){
         this.maxDataFileLength=maxDataFileLength;
+    }
+
+    /**
+     * @return the indexType
+     */
+    public String getIndexType(){
+        return this.indexType;
+    }
+
+    /**
+     * @param indexType the indexTypes to set
+     */
+    public void setIndexType(String indexType){
+        this.indexType=indexType;
+    }
+
+    protected synchronized Store getStore() throws IOException{
+        if(theStore==null){
+            String name=dir.getAbsolutePath()+File.separator+"kaha.db";
+            theStore=StoreFactory.open(name,"rw");
+            theStore.setMaxDataFileLength(maxDataFileLength);
+            theStore.setIndexType(indexType);
+        }
+        return theStore;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Thu Oct  5 00:19:35 2006
@@ -21,15 +21,19 @@
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.TopicMessageStore;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@@ -37,15 +41,18 @@
 /**
  * @version $Revision: 1.5 $
  */
-public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
-    private Map ackContainer;
+public class KahaTopicMessageStore  implements TopicMessageStore{
+    private ActiveMQDestination destination;
+    private ListContainer ackContainer;
+    private ListContainer messageContainer;
     private Map subscriberContainer;
     private Store store;
     private Map subscriberAcks=new ConcurrentHashMap();
 
-    public KahaTopicMessageStore(Store store,MapContainer messageContainer,MapContainer ackContainer,
+    public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer
ackContainer,
                     MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        super(messageContainer,destination);
+        this.messageContainer = messageContainer;
+        this.destination = destination;
         this.store=store;
         this.ackContainer=ackContainer;
         subscriberContainer=subsContainer;
@@ -59,32 +66,35 @@
     public synchronized void addMessage(ConnectionContext context,Message message) throws
IOException{
         int subscriberCount=subscriberAcks.size();
         if(subscriberCount>0){
-            String id=message.getMessageId().toString();
-            ackContainer.put(id,new AtomicInteger(subscriberCount));
+            StoreEntry entry = messageContainer.placeLast(message);
+            TopicSubAck tsa = new TopicSubAck();
+            tsa.setCount(subscriberCount);
+            tsa.setStoreEntry(entry);
+            StoreEntry ackEntry = ackContainer.placeLast(tsa);
             for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
                 Object key=i.next();
                 ListContainer container=store.getListContainer(key,"durable-subs");
-                container.add(id);
+                container.add(ackEntry);
             }
-            super.addMessage(context,message);
+            
         }
     }
 
     public synchronized void acknowledge(ConnectionContext context,String clientId,String
subscriptionName,
-                    MessageId messageId) throws IOException{
+            MessageId messageId) throws IOException{
         String subcriberId=getSubscriptionKey(clientId,subscriptionName);
-        String id=messageId.toString();
-        ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+        ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
         if(container!=null){
-            //container.remove(id);
-            container.removeFirst();
-            AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
-            if(count!=null){
-                if(count.decrementAndGet()>0){
-                    ackContainer.put(id,count);
-                }else{
-                    // no more references to message messageContainer so remove it
-                    super.removeMessage(messageId);
+            StoreEntry ackEntry=(StoreEntry)container.removeFirst();
+            if(ackEntry!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ackEntry);
+                        messageContainer.remove(tsa.getStoreEntry());
+                    }else {
+                       ackContainer.update(ackEntry,tsa);
+                    }
                 }
             }
         }
@@ -115,14 +125,16 @@
         subscriberContainer.remove(key);
         ListContainer list=(ListContainer) subscriberAcks.get(key);
         for(Iterator i=list.iterator();i.hasNext();){
-            String id=i.next().toString();
-            AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
-            if(count!=null){
-                if(count.decrementAndGet()>0){
-                    ackContainer.put(id,count);
-                }else{
-                    // no more references to message messageContainer so remove it
-                    messageContainer.remove(id);
+            StoreEntry ackEntry=(StoreEntry)i.next();
+            if(ackEntry!=null){
+                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
+                if(tsa!=null){
+                    if(tsa.decrementCount()<=0){
+                        ackContainer.remove(ackEntry);
+                        messageContainer.remove(tsa.getStoreEntry());
+                    }else {
+                       ackContainer.update(ackEntry,tsa);
+                    }
                 }
             }
         }
@@ -134,7 +146,8 @@
         ListContainer list=(ListContainer) subscriberAcks.get(key);
         if(list!=null){
             for(Iterator i=list.iterator();i.hasNext();){
-                Object msg=messageContainer.get(i.next());
+                TopicSubAck tsa = (TopicSubAck)i.next();
+                Object msg=messageContainer.get(tsa.getStoreEntry());
                 if(msg!=null){
                     if(msg.getClass()==String.class){
                         listener.recoverMessageReference((String) msg);
@@ -157,7 +170,8 @@
             boolean startFound=false;
             int count = 0;
             for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
-                Object msg=messageContainer.get(i.next());
+                TopicSubAck tsa = (TopicSubAck)i.next();
+                Object msg=messageContainer.get(tsa.getStoreEntry());
                 if(msg!=null){
                     if(msg.getClass()==String.class){
                         String ref=msg.toString();
@@ -186,7 +200,7 @@
     }
 
     public void delete(){
-        super.delete();
+        messageContainer.clear();
         ackContainer.clear();
         subscriberContainer.clear();
     }
@@ -204,7 +218,7 @@
 
     protected void addSubscriberAckContainer(Object key) throws IOException{
         ListContainer container=store.getListContainer(key,"topic-subs");
-        Marshaller marshaller=new StringMarshaller();
+        Marshaller marshaller=new StoreEntryMarshaller();
         container.setMarshaller(marshaller);
         subscriberAcks.put(key,container);
     }
@@ -213,14 +227,135 @@
         String key=getSubscriptionKey(clientId,subscriptionName);
         ListContainer list=(ListContainer) subscriberAcks.get(key);
         Iterator iter = list.iterator();
-        return (Message) (iter.hasNext() ? iter.next() : null);
-        
+        TopicSubAck tsa = (TopicSubAck)list.get(0);
+        Message msg=(Message)messageContainer.get(tsa.getStoreEntry());
+        return msg;
     }
 
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
         String key=getSubscriptionKey(clientId,subscriberName);
         ListContainer list=(ListContainer) subscriberAcks.get(key);
         return list.size();
+    }
+
+    /**
+     * @param context
+     * @param messageId
+     * @param expirationTime
+     * @param messageRef
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext,
org.apache.activemq.command.MessageId, long, java.lang.String)
+     */
+    public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String
messageRef) throws IOException{
+        messageContainer.add(messageRef);
+        
+    }
+
+    /**
+     * @return the destination
+     * @see org.apache.activemq.store.MessageStore#getDestination()
+     */
+    public ActiveMQDestination getDestination(){
+       return destination;
+    }
+
+    /**
+     * @param identity
+     * @return the Message
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
+     */
+    public Message getMessage(MessageId identity) throws IOException{
+        Message result = null;
+        for (Iterator i = messageContainer.iterator(); i.hasNext();){
+            Message msg = (Message)i.next();
+            if (msg.getMessageId().equals(identity)) {
+                result = msg;
+                break;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @param identity
+     * @return String
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
+     */
+    public String getMessageReference(MessageId identity) throws IOException{
+        return null;
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.store.MessageStore#recover(org.apache.activemq.store.MessageRecoveryListener)
+     */
+    public void recover(MessageRecoveryListener listener) throws Exception{
+        for(Iterator iter=messageContainer.iterator();iter.hasNext();){
+            Object msg=iter.next();
+            if(msg.getClass()==String.class){
+                listener.recoverMessageReference((String) msg);
+            }else{
+                listener.recoverMessage((Message) msg);
+            }
+        }
+        listener.finished();
+        
+    }
+
+    /**
+     * @param context
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
+     */
+    public void removeAllMessages(ConnectionContext context) throws IOException{
+        messageContainer.clear();
+        
+    }
+
+    /**
+     * @param context
+     * @param ack
+     * @throws IOException
+     * @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext,
org.apache.activemq.command.MessageAck)
+     */
+    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+        for (Iterator i = messageContainer.iterator(); i.hasNext();){
+            Message msg = (Message)i.next();
+            if (msg.getMessageId().equals(ack.getLastMessageId())) {
+               i.remove();
+                break;
+            }
+        }
+        
+    }
+
+    /**
+     * @param usageManager
+     * @see org.apache.activemq.store.MessageStore#setUsageManager(org.apache.activemq.memory.UsageManager)
+     */
+    public void setUsageManager(UsageManager usageManager){
+        // TODO Auto-generated method stub
+        
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#start()
+     */
+    public void start() throws Exception{
+        // TODO Auto-generated method stub
+        
+    }
+
+    /**
+     * @throws Exception
+     * @see org.apache.activemq.Service#stop()
+     */
+    public void stop() throws Exception{
+        // TODO Auto-generated method stub
+        
     }
 
     

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
Thu Oct  5 00:19:35 2006
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import junit.framework.TestCase;
+import org.apache.activemq.kaha.IndexTypes;
 import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.kaha.impl.KahaStore;
 import org.apache.activemq.kaha.impl.container.ContainerId;
@@ -131,8 +132,8 @@
         ContainerId containerId=new ContainerId();
         containerId.setKey(id);
         containerId.setDataContainerName(containerName);
-        IndexItem root=store.listsContainer.addRoot(containerId);
-        ListContainerImpl result=new ListContainerImpl(containerId,root,store.rootIndexManager,im,dm);
+        IndexItem root=store.getListsContainer().addRoot(im,containerId);
+        ListContainerImpl result=new ListContainerImpl(containerId,root,im,dm,IndexTypes.DISK_INDEX);
         result.expressDataInterest();
         result.setMaximumCacheSize(MAX_CACHE_SIZE);
         return result;

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
Thu Oct  5 00:19:35 2006
@@ -22,6 +22,7 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;
 
@@ -200,7 +201,7 @@
             list.add(i,(IndexItem) testData.get(i));
         }
         for (int i =0; i < testData.size(); i++){
-            assertTrue(list.indexOf((IndexItem) testData.get(i))==i);
+            assertTrue(list.indexOf((StoreEntry) testData.get(i))==i);
         }
     }
 



Mime
View raw message