activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [20/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Aug  8 11:56:59 2007
@@ -27,107 +27,110 @@
 import org.apache.activemq.store.MessageStore;
 
 /**
- * An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
+ * An implementation of {@link org.apache.activemq.store.MessageStore} which
+ * uses a JPS Container
  * 
  * @version $Revision: 1.7 $
  */
-public class KahaMessageStore implements MessageStore{
+public class KahaMessageStore implements MessageStore {
 
     protected final ActiveMQDestination destination;
-    protected final MapContainer<MessageId,Message> messageContainer;
-    protected StoreEntry batchEntry=null;
+    protected final MapContainer<MessageId, Message> messageContainer;
+    protected StoreEntry batchEntry = null;
 
-    public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination)
-            throws IOException{
-        this.messageContainer=container;
-        this.destination=destination;
+    public KahaMessageStore(MapContainer<MessageId, Message> container, ActiveMQDestination destination)
+        throws IOException {
+        this.messageContainer = container;
+        this.destination = destination;
     }
 
-    protected MessageId getMessageId(Object object){
+    protected MessageId getMessageId(Object object) {
         return ((Message)object).getMessageId();
     }
 
-    public Object getId(){
+    public Object getId() {
         return messageContainer.getId();
     }
 
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        messageContainer.put(message.getMessageId(),message);
-        // TODO: we should do the following but it is not need if the message is being added within a persistence
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+        messageContainer.put(message.getMessageId(), message);
+        // TODO: we should do the following but it is not need if the message is
+        // being added within a persistence
         // transaction
-        // but since I can't tell if one is running right now.. I'll leave this out for now.
+        // but since I can't tell if one is running right now.. I'll leave this
+        // out for now.
         // if( message.isResponseRequired() ) {
         // messageContainer.force();
         // }
     }
 
-    public synchronized Message getMessage(MessageId identity) throws IOException{
-        Message result=messageContainer.get(identity);
+    public synchronized Message getMessage(MessageId identity) throws IOException {
+        Message result = messageContainer.get(identity);
         return result;
     }
 
-    protected boolean recoverMessage(MessageRecoveryListener listener,Message msg) throws Exception{
-        if(listener.hasSpace()){
+    protected boolean recoverMessage(MessageRecoveryListener listener, Message msg) throws Exception {
+        if (listener.hasSpace()) {
             listener.recoverMessage(msg);
             return true;
         }
         return false;
     }
 
-    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
         removeMessage(ack.getLastMessageId());
     }
 
-    
-    
-    public synchronized void removeMessage(MessageId msgId) throws IOException{
-        StoreEntry entry=messageContainer.getEntry(msgId);
-        if(entry!=null){
+    public synchronized void removeMessage(MessageId msgId) throws IOException {
+        StoreEntry entry = messageContainer.getEntry(msgId);
+        if (entry != null) {
             messageContainer.remove(entry);
-            if(messageContainer.isEmpty()||(batchEntry!=null&&batchEntry.equals(entry))){
+            if (messageContainer.isEmpty() || (batchEntry != null && batchEntry.equals(entry))) {
                 resetBatching();
             }
         }
     }
 
-    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
-        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            Message msg=(Message)messageContainer.getValue(entry);
-            if(!recoverMessage(listener,msg)) {
+    public synchronized void recover(MessageRecoveryListener listener) throws Exception {
+        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
+            .getNext(entry)) {
+            Message msg = (Message)messageContainer.getValue(entry);
+            if (!recoverMessage(listener, msg)) {
                 break;
             }
         }
     }
 
-    public void start(){
+    public void start() {
     }
 
-    public void stop(){
+    public void stop() {
     }
 
-    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
     }
 
-    public ActiveMQDestination getDestination(){
+    public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public synchronized void delete(){
+    public synchronized void delete() {
         messageContainer.clear();
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     * @param usageManager The UsageManager that is controlling the
+     *                destination's memory usage.
      */
-    public void setUsageManager(UsageManager usageManager){
+    public void setUsageManager(UsageManager usageManager) {
     }
 
     /**
      * @return the number of messages held by this destination
      * @see org.apache.activemq.store.MessageStore#getMessageCount()
      */
-    public int getMessageCount(){
+    public int getMessageCount() {
         return messageContainer.size();
     }
 
@@ -137,7 +140,7 @@
      * @throws Exception
      * @see org.apache.activemq.store.MessageStore#getPreviousMessageIdToDeliver(org.apache.activemq.command.MessageId)
      */
-    public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception{
+    public MessageId getPreviousMessageIdToDeliver(MessageId id) throws Exception {
         return null;
     }
 
@@ -146,31 +149,32 @@
      * @param maxReturned
      * @param listener
      * @throws Exception
-     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int,
-     *      org.apache.activemq.store.MessageRecoveryListener)
+     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId,
+     *      int, org.apache.activemq.store.MessageRecoveryListener)
      */
-    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
-        StoreEntry entry=batchEntry;
-        if(entry==null){
-            entry=messageContainer.getFirst();
-        }else{
-            entry=messageContainer.refresh(entry);
-            entry=messageContainer.getNext(entry);
-            if(entry==null){
-                batchEntry=null;
+    public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
+        throws Exception {
+        StoreEntry entry = batchEntry;
+        if (entry == null) {
+            entry = messageContainer.getFirst();
+        } else {
+            entry = messageContainer.refresh(entry);
+            entry = messageContainer.getNext(entry);
+            if (entry == null) {
+                batchEntry = null;
             }
         }
-        if(entry!=null){
-            int count=0;
-            do{
-                Message msg=messageContainer.getValue(entry);
-                if(msg!=null){
-                    recoverMessage(listener,msg);
+        if (entry != null) {
+            int count = 0;
+            do {
+                Message msg = messageContainer.getValue(entry);
+                if (msg != null) {
+                    recoverMessage(listener, msg);
                     count++;
                 }
-                batchEntry=entry;
-                entry=messageContainer.getNext(entry);
-            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+                batchEntry = entry;
+                entry = messageContainer.getNext(entry);
+            } while (entry != null && count < maxReturned && listener.hasSpace());
         }
     }
 
@@ -178,14 +182,14 @@
      * @param nextToDispatch
      * @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
      */
-    public synchronized void resetBatching(){
-        batchEntry=null;
+    public synchronized void resetBatching() {
+        batchEntry = null;
     }
 
     /**
      * @return true if the store supports cursors
      */
-    public boolean isSupportForCursors(){
+    public boolean isSupportForCursors() {
         return true;
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Wed Aug  8 11:56:59 2007
@@ -51,92 +51,96 @@
  * 
  * @version $Revision: 1.4 $
  */
-public class KahaPersistenceAdapter implements PersistenceAdapter{
+public class KahaPersistenceAdapter implements PersistenceAdapter {
 
-    private static final int STORE_LOCKED_WAIT_DELAY=10*1000;
-    private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
-    static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
+    private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
+    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
+    static final String PREPARED_TRANSACTIONS_NAME = "PreparedTransactions";
     KahaTransactionStore transactionStore;
-    ConcurrentHashMap<ActiveMQTopic,TopicMessageStore> topics=new ConcurrentHashMap<ActiveMQTopic,TopicMessageStore>();
-    ConcurrentHashMap<ActiveMQQueue,MessageStore> queues=new ConcurrentHashMap<ActiveMQQueue,MessageStore>();
-    ConcurrentHashMap<ActiveMQDestination,MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination,MessageStore>();
-    protected OpenWireFormat wireFormat=new OpenWireFormat();
-    private long maxDataFileLength=32*1024*1024;
+    ConcurrentHashMap<ActiveMQTopic, TopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, TopicMessageStore>();
+    ConcurrentHashMap<ActiveMQQueue, MessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, MessageStore>();
+    ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
+    protected OpenWireFormat wireFormat = new OpenWireFormat();
+    private long maxDataFileLength = 32 * 1024 * 1024;
     private File directory;
     private String brokerName;
     private Store theStore;
     private boolean initialized;
 
-    public Set<ActiveMQDestination> getDestinations(){
-        Set<ActiveMQDestination> rc=new HashSet<ActiveMQDestination>();
-        try{
-            Store store=getStore();
-            for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
-                ContainerId id=(ContainerId) i.next();
+    public Set<ActiveMQDestination> getDestinations() {
+        Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
+        try {
+            Store store = getStore();
+            for (Iterator i = store.getMapContainerIds().iterator(); i.hasNext();) {
+                ContainerId id = (ContainerId)i.next();
                 Object obj = id.getKey();
-                if(obj instanceof ActiveMQDestination){
+                if (obj instanceof ActiveMQDestination) {
                     rc.add((ActiveMQDestination)obj);
                 }
             }
-        }catch(IOException e){
-            log.error("Failed to get destinations ",e);
+        } catch (IOException e) {
+            log.error("Failed to get destinations ", e);
         }
         return rc;
     }
 
-    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
-        MessageStore rc=queues.get(destination);
-        if(rc==null){
-            rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
-            messageStores.put(destination,rc);
-            if(transactionStore!=null){
-                rc=transactionStore.proxy(rc);
+    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+        MessageStore rc = queues.get(destination);
+        if (rc == null) {
+            rc = new KahaMessageStore(getMapContainer(destination, "queue-data"), destination);
+            messageStores.put(destination, rc);
+            if (transactionStore != null) {
+                rc = transactionStore.proxy(rc);
             }
-            queues.put(destination,rc);
+            queues.put(destination, rc);
         }
         return rc;
     }
 
-    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
-        TopicMessageStore rc=topics.get(destination);
-        if(rc==null){
-            Store store=getStore();
-            MapContainer messageContainer=getMapContainer(destination,"topic-data");
-            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
-            ListContainer<TopicSubAck> ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
+        throws IOException {
+        TopicMessageStore rc = topics.get(destination);
+        if (rc == null) {
+            Store store = getStore();
+            MapContainer messageContainer = getMapContainer(destination, "topic-data");
+            MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
+                                                             "topic-subs");
+            ListContainer<TopicSubAck> ackContainer = store.getListContainer(destination.toString(),
+                                                                             "topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
-            messageStores.put(destination,rc);
-            if(transactionStore!=null){
-                rc=transactionStore.proxy(rc);
+            rc = new KahaTopicMessageStore(store, messageContainer, ackContainer, subsContainer, destination);
+            messageStores.put(destination, rc);
+            if (transactionStore != null) {
+                rc = transactionStore.proxy(rc);
             }
-            topics.put(destination,rc);
+            topics.put(destination, rc);
         }
         return rc;
     }
 
-    protected MessageStore retrieveMessageStore(Object id){
-        MessageStore result=messageStores.get(id);
+    protected MessageStore retrieveMessageStore(Object id) {
+        MessageStore result = messageStores.get(id);
         return result;
     }
 
-    public TransactionStore createTransactionStore() throws IOException{
-        if(transactionStore==null){
-            while(true){
-                try{
-                    Store store=getStore();
-                    MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
+    public TransactionStore createTransactionStore() throws IOException {
+        if (transactionStore == null) {
+            while (true) {
+                try {
+                    Store store = getStore();
+                    MapContainer container = store
+                        .getMapContainer(PREPARED_TRANSACTIONS_NAME, "transactions");
                     container.setKeyMarshaller(new CommandMarshaller(wireFormat));
                     container.setValueMarshaller(new TransactionMarshaller(wireFormat));
                     container.load();
-                    transactionStore=new KahaTransactionStore(this,container);
+                    transactionStore = new KahaTransactionStore(this, container);
                     break;
-                }catch(StoreLockedExcpetion e){
-                    log.info("Store is locked... waiting "+(STORE_LOCKED_WAIT_DELAY/1000)
-                            +" seconds for the Store to be unlocked.");
-                    try{
+                } catch (StoreLockedExcpetion e) {
+                    log.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
+                             + " seconds for the Store to be unlocked.");
+                    try {
                         Thread.sleep(STORE_LOCKED_WAIT_DELAY);
-                    }catch(InterruptedException e1){
+                    } catch (InterruptedException e1) {
                     }
                 }
             }
@@ -144,84 +148,87 @@
         return transactionStore;
     }
 
-    public void beginTransaction(ConnectionContext context){
+    public void beginTransaction(ConnectionContext context) {
     }
 
-    public void commitTransaction(ConnectionContext context) throws IOException{
-        if(theStore!=null){
+    public void commitTransaction(ConnectionContext context) throws IOException {
+        if (theStore != null) {
             theStore.force();
         }
     }
 
-    public void rollbackTransaction(ConnectionContext context){
+    public void rollbackTransaction(ConnectionContext context) {
     }
 
-    public void start() throws Exception{
+    public void start() throws Exception {
         initialize();
     }
 
-    public void stop() throws Exception{
-        if(theStore!=null){
+    public void stop() throws Exception {
+        if (theStore != null) {
             theStore.close();
         }
     }
 
-    public long getLastMessageBrokerSequenceId() throws IOException{
+    public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
 
-    public void deleteAllMessages() throws IOException{
-        if(theStore!=null){
-            if(theStore.isInitialized()){
+    public void deleteAllMessages() throws IOException {
+        if (theStore != null) {
+            if (theStore.isInitialized()) {
                 theStore.clear();
-            }else{
+            } else {
                 theStore.delete();
             }
-        }else{
+        } else {
             StoreFactory.delete(getStoreName());
         }
     }
 
-    protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException{
-        Store store=getStore();
-        MapContainer<MessageId,Message> container=store.getMapContainer(id,containerName);
+    protected MapContainer<MessageId, Message> getMapContainer(Object id, String containerName)
+        throws IOException {
+        Store store = getStore();
+        MapContainer<MessageId, Message> container = store.getMapContainer(id, containerName);
         container.setKeyMarshaller(new MessageIdMarshaller());
         container.setValueMarshaller(new MessageMarshaller(wireFormat));
         container.load();
         return container;
     }
 
-    protected MapContainer<String,Object> getSubsMapContainer(Object id,String containerName) throws IOException{
-        Store store=getStore();
-        MapContainer<String,Object> container=store.getMapContainer(id,containerName);
+    protected MapContainer<String, Object> getSubsMapContainer(Object id, String containerName)
+        throws IOException {
+        Store store = getStore();
+        MapContainer<String, Object> container = store.getMapContainer(id, containerName);
         container.setKeyMarshaller(Store.StringMarshaller);
         container.setValueMarshaller(createMessageMarshaller());
         container.load();
         return container;
     }
 
-    protected Marshaller<Object> createMessageMarshaller(){
+    protected Marshaller<Object> createMessageMarshaller() {
         return new CommandMarshaller(wireFormat);
     }
 
-    protected ListContainer getListContainer(Object id,String containerName) throws IOException{
-        Store store=getStore();
-        ListContainer container=store.getListContainer(id,containerName);
+    protected ListContainer getListContainer(Object id, String containerName) throws IOException {
+        Store store = getStore();
+        ListContainer container = store.getListContainer(id, containerName);
         container.setMarshaller(createMessageMarshaller());
         container.load();
         return container;
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     * @param usageManager The UsageManager that is controlling the broker's
+     *                memory usage.
      */
-    public void setUsageManager(UsageManager usageManager){
+    public void setUsageManager(UsageManager usageManager) {
     }
 
     /**
      * @return the maxDataFileLength
      */
-    public long getMaxDataFileLength(){
+    public long getMaxDataFileLength() {
         return maxDataFileLength;
     }
 
@@ -230,62 +237,61 @@
      * 
      * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
      */
-    public void setMaxDataFileLength(long maxDataFileLength){
-        this.maxDataFileLength=maxDataFileLength;
+    public void setMaxDataFileLength(long maxDataFileLength) {
+        this.maxDataFileLength = maxDataFileLength;
     }
 
-    protected synchronized Store getStore() throws IOException{
-        if(theStore==null){
-            theStore=StoreFactory.open(getStoreName(),"rw");
+    protected synchronized Store getStore() throws IOException {
+        if (theStore == null) {
+            theStore = StoreFactory.open(getStoreName(), "rw");
             theStore.setMaxDataFileLength(maxDataFileLength);
         }
         return theStore;
     }
 
-    private String getStoreName(){
+    private String getStoreName() {
         initialize();
         return directory.getAbsolutePath();
     }
 
-    public String toString(){
-        return "KahaPersistenceAdapter("+getStoreName()+")";
+    public String toString() {
+        return "KahaPersistenceAdapter(" + getStoreName() + ")";
     }
 
-    public void setBrokerName(String brokerName){
-        this.brokerName=brokerName;
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
     }
-    
-    public String getBrokerName(){
+
+    public String getBrokerName() {
         return brokerName;
     }
 
-    public File getDirectory(){
+    public File getDirectory() {
         return this.directory;
     }
 
-    public void setDirectory(File directory){
-        this.directory=directory;
+    public void setDirectory(File directory) {
+        this.directory = directory;
     }
-  
-    public void checkpoint(boolean sync) throws IOException{
-        if(sync){
+
+    public void checkpoint(boolean sync) throws IOException {
+        if (sync) {
             getStore().force();
         }
     }
 
-    private void initialize(){
-        if(!initialized){
-            initialized=true;
-            if(this.directory==null){
-                File file =new File(IOHelper.getDefaultDataDirectory());
-                file=new File(file,brokerName+"-kahastore");
+    private void initialize() {
+        if (!initialized) {
+            initialized = true;
+            if (this.directory == null) {
+                File file = new File(IOHelper.getDefaultDataDirectory());
+                file = new File(file, brokerName + "-kahastore");
                 setDirectory(file);
-            }         
+            }
             this.directory.mkdirs();
             wireFormat.setCacheEnabled(false);
             wireFormat.setTightEncodingEnabled(true);
         }
     }
 
-   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Aug  8 11:56:59 2007
@@ -26,39 +26,41 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
 
-public class KahaReferenceStore implements ReferenceStore{
+public class KahaReferenceStore implements ReferenceStore {
 
     protected final ActiveMQDestination destination;
-    protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
+    protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
-    private StoreEntry batchEntry=null;
-    private String lastBatchId=null;
+    private StoreEntry batchEntry = null;
+    private String lastBatchId = null;
 
-    public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
+    public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer container,
+                              ActiveMQDestination destination) throws IOException {
         this.adapter = adapter;
-        this.messageContainer=container;
-        this.destination=destination;
+        this.messageContainer = container;
+        this.destination = destination;
     }
 
-    public void start(){
+    public void start() {
     }
 
-    public void stop(){
+    public void stop() {
     }
 
-    protected MessageId getMessageId(Object object){
+    protected MessageId getMessageId(Object object) {
         return new MessageId(((ReferenceRecord)object).getMessageId());
     }
 
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    public synchronized Message getMessage(MessageId identity) throws IOException{
+    public synchronized Message getMessage(MessageId identity) throws IOException {
         throw new RuntimeException("Use addMessageReference instead");
     }
 
-    protected final boolean recoverReference(MessageRecoveryListener listener,ReferenceRecord record) throws Exception{
+    protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
+        throws Exception {
         if (listener.hasSpace()) {
             listener.recoverMessageReference(new MessageId(record.getMessageId()));
             return true;
@@ -66,118 +68,121 @@
         return false;
     }
 
-    public synchronized void recover(MessageRecoveryListener listener) throws Exception{
-        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            ReferenceRecord record=messageContainer.getValue(entry);
-            if (!recoverReference(listener,record)) {
+    public synchronized void recover(MessageRecoveryListener listener) throws Exception {
+        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
+            .getNext(entry)) {
+            ReferenceRecord record = messageContainer.getValue(entry);
+            if (!recoverReference(listener, record)) {
                 break;
             }
         }
     }
 
-    public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
-        StoreEntry entry=batchEntry;
-        if(entry==null){
-            entry=messageContainer.getFirst();
-        }else{
-            entry=messageContainer.refresh(entry);
-            if(entry!=null){
-                entry=messageContainer.getNext(entry);
+    public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
+        throws Exception {
+        StoreEntry entry = batchEntry;
+        if (entry == null) {
+            entry = messageContainer.getFirst();
+        } else {
+            entry = messageContainer.refresh(entry);
+            if (entry != null) {
+                entry = messageContainer.getNext(entry);
             }
         }
-        if(entry!=null){
-            int count=0;
-            do{
-                ReferenceRecord msg=messageContainer.getValue(entry);
-                if(msg!=null){
-                    recoverReference(listener,msg);
+        if (entry != null) {
+            int count = 0;
+            do {
+                ReferenceRecord msg = messageContainer.getValue(entry);
+                if (msg != null) {
+                    recoverReference(listener, msg);
                     count++;
-                    lastBatchId=msg.getMessageId();
-                }else{
-                    lastBatchId=null;
+                    lastBatchId = msg.getMessageId();
+                } else {
+                    lastBatchId = null;
                 }
-                batchEntry=entry;
-                entry=messageContainer.getNext(entry);
-            }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+                batchEntry = entry;
+                entry = messageContainer.getNext(entry);
+            } while (entry != null && count < maxReturned && listener.hasSpace());
         }
     }
 
-    public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
-            throws IOException{
-        ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
-        messageContainer.put(messageId,record);
+    public synchronized void addMessageReference(ConnectionContext context, MessageId messageId,
+                                                 ReferenceData data) throws IOException {
+        ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
+        messageContainer.put(messageId, record);
         addInterest(record);
     }
 
-    public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException{
-        ReferenceRecord result=messageContainer.get(identity);
-        if(result==null)
+    public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException {
+        ReferenceRecord result = messageContainer.get(identity);
+        if (result == null)
             return null;
         return result.getData();
     }
 
-    public void addReferenceFileIdsInUse(){
-        for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
-            ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
+    public void addReferenceFileIdsInUse() {
+        for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
+            .getNext(entry)) {
+            ReferenceRecord msg = (ReferenceRecord)messageContainer.getValue(entry);
             addInterest(msg);
         }
     }
 
-    public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
+    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
         removeMessage(ack.getLastMessageId());
     }
 
-    public synchronized void removeMessage(MessageId msgId) throws IOException{
-        StoreEntry entry=messageContainer.getEntry(msgId);
-        if(entry!=null){
-            ReferenceRecord rr=messageContainer.remove(msgId);
-            if(rr!=null){
+    public synchronized void removeMessage(MessageId msgId) throws IOException {
+        StoreEntry entry = messageContainer.getEntry(msgId);
+        if (entry != null) {
+            ReferenceRecord rr = messageContainer.remove(msgId);
+            if (rr != null) {
                 removeInterest(rr);
-                if(messageContainer.isEmpty()||(lastBatchId!=null&&lastBatchId.equals(msgId.toString()))
-                        ||(batchEntry!=null&&batchEntry.equals(entry))){
+                if (messageContainer.isEmpty()
+                    || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
+                    || (batchEntry != null && batchEntry.equals(entry))) {
                     resetBatching();
                 }
             }
         }
     }
 
-    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
     }
 
-    public ActiveMQDestination getDestination(){
+    public ActiveMQDestination getDestination() {
         return destination;
     }
 
-    public synchronized void delete(){
+    public synchronized void delete() {
         messageContainer.clear();
     }
 
-    public synchronized void resetBatching(){
-        batchEntry=null;
-        lastBatchId=null;
+    public synchronized void resetBatching() {
+        batchEntry = null;
+        lastBatchId = null;
     }
 
-    public int getMessageCount(){
+    public int getMessageCount() {
         return messageContainer.size();
     }
 
-    public void setUsageManager(UsageManager usageManager){
+    public void setUsageManager(UsageManager usageManager) {
     }
 
-    public boolean isSupportForCursors(){
+    public boolean isSupportForCursors() {
         return true;
     }
 
-    
-    public boolean supportsExternalBatchControl(){
+    public boolean supportsExternalBatchControl() {
         return true;
     }
-    
+
     void removeInterest(ReferenceRecord rr) {
         adapter.removeInterestInRecordFile(rr.getData().getFileId());
     }
-    
+
     void addInterest(ReferenceRecord rr) {
         adapter.addInterestInRecordFile(rr.getData().getFileId());
     }
@@ -186,6 +191,6 @@
      * @param startAfter
      * @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
      */
-    public void setBatch(MessageId startAfter){        
+    public void setBatch(MessageId startAfter) {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Aug  8 11:56:59 2007
@@ -48,137 +48,144 @@
 import org.apache.commons.logging.LogFactory;
 import org.codehaus.groovy.antlr.treewalker.PreOrderTraversal;
 
-public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter{
+public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
 
-    private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
-    private static final String STORE_STATE="store-state";
-    private static final String RECORD_REFERENCES="record-references";
-    private static final String TRANSACTIONS="transactions-state";
+    private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
+    private static final String STORE_STATE = "store-state";
+    private static final String RECORD_REFERENCES = "record-references";
+    private static final String TRANSACTIONS = "transactions-state";
     private MapContainer stateMap;
     private MapContainer preparedTransactions;
-    private Map<Integer,AtomicInteger> recordReferences=new HashMap<Integer,AtomicInteger>();
+    private Map<Integer, AtomicInteger> recordReferences = new HashMap<Integer, AtomicInteger>();
     private ListContainer durableSubscribers;
     private boolean storeValid;
     private Store stateStore;
 
-    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
+    public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         throw new RuntimeException("Use createQueueReferenceStore instead");
     }
 
-    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
+    public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
+        throws IOException {
         throw new RuntimeException("Use createTopicReferenceStore instead");
     }
 
-    @Override public synchronized void start() throws Exception{
+    @Override
+    public synchronized void start() throws Exception {
         super.start();
-        Store store=getStateStore();
-        boolean empty=store.getMapContainerIds().isEmpty();
-        stateMap=store.getMapContainer("state",STORE_STATE);
+        Store store = getStateStore();
+        boolean empty = store.getMapContainerIds().isEmpty();
+        stateMap = store.getMapContainer("state", STORE_STATE);
         stateMap.load();
-        if(!empty){
-            AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
-            if(status!=null){
-                storeValid=status.get();
+        if (!empty) {
+            AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
+            if (status != null) {
+                storeValid = status.get();
             }
-            if(storeValid){
-                if(stateMap.containsKey(RECORD_REFERENCES)){
-                    recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
+            if (storeValid) {
+                if (stateMap.containsKey(RECORD_REFERENCES)) {
+                    recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
                 }
             }
         }
-        stateMap.put(STORE_STATE,new AtomicBoolean());
-        durableSubscribers=store.getListContainer("durableSubscribers");
+        stateMap.put(STORE_STATE, new AtomicBoolean());
+        durableSubscribers = store.getListContainer("durableSubscribers");
         durableSubscribers.setMarshaller(new CommandMarshaller());
-        preparedTransactions=store.getMapContainer("transactions",TRANSACTIONS,false);
-        //need to set the Marshallers here
+        preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
+        // need to set the Marshallers here
         preparedTransactions.setKeyMarshaller(Store.CommandMarshaller);
         preparedTransactions.setValueMarshaller(new AMQTxMarshaller(wireFormat));
     }
 
-    @Override public synchronized void stop() throws Exception{
-        stateMap.put(RECORD_REFERENCES,recordReferences);
-        stateMap.put(STORE_STATE,new AtomicBoolean(true));
-        if(this.stateStore!=null){
+    @Override
+    public synchronized void stop() throws Exception {
+        stateMap.put(RECORD_REFERENCES, recordReferences);
+        stateMap.put(STORE_STATE, new AtomicBoolean(true));
+        if (this.stateStore != null) {
             this.stateStore.close();
-            this.stateStore=null;
-            this.stateMap=null;
+            this.stateStore = null;
+            this.stateMap = null;
         }
         super.stop();
     }
 
-    public boolean isStoreValid(){
+    public boolean isStoreValid() {
         return storeValid;
     }
 
-    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException{
-        ReferenceStore rc=(ReferenceStore)queues.get(destination);
-        if(rc==null){
-            rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
-            messageStores.put(destination,rc);
-            //            if(transactionStore!=null){
-            //                rc=transactionStore.proxy(rc);
-            //            }
-            queues.put(destination,rc);
+    public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
+        ReferenceStore rc = (ReferenceStore)queues.get(destination);
+        if (rc == null) {
+            rc = new KahaReferenceStore(this, getMapReferenceContainer(destination, "queue-data"),
+                                        destination);
+            messageStores.put(destination, rc);
+            // if(transactionStore!=null){
+            // rc=transactionStore.proxy(rc);
+            // }
+            queues.put(destination, rc);
         }
         return rc;
     }
 
-    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException{
-        TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
-        if(rc==null){
-            Store store=getStore();
-            MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
-            MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
-            ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
+    public TopicReferenceStore createTopicReferenceStore(ActiveMQTopic destination) throws IOException {
+        TopicReferenceStore rc = (TopicReferenceStore)topics.get(destination);
+        if (rc == null) {
+            Store store = getStore();
+            MapContainer messageContainer = getMapReferenceContainer(destination, "topic-data");
+            MapContainer subsContainer = getSubsMapContainer(destination.toString() + "-Subscriptions",
+                                                             "blob");
+            ListContainer ackContainer = store.getListContainer(destination.toString(), "topic-acks");
             ackContainer.setMarshaller(new TopicSubAckMarshaller());
-            rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
-            messageStores.put(destination,rc);
-            //            if(transactionStore!=null){
-            //                rc=transactionStore.proxy(rc);
-            //            }
-            topics.put(destination,rc);
+            rc = new KahaTopicReferenceStore(store, this, messageContainer, ackContainer, subsContainer,
+                                             destination);
+            messageStores.put(destination, rc);
+            // if(transactionStore!=null){
+            // rc=transactionStore.proxy(rc);
+            // }
+            topics.put(destination, rc);
         }
         return rc;
     }
 
-    public void buildReferenceFileIdsInUse() throws IOException{
-        recordReferences=new HashMap<Integer,AtomicInteger>();
-        Set<ActiveMQDestination> destinations=getDestinations();
-        for(ActiveMQDestination destination:destinations){
-            if(destination.isQueue()){
-                KahaReferenceStore store=(KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
+    public void buildReferenceFileIdsInUse() throws IOException {
+        recordReferences = new HashMap<Integer, AtomicInteger>();
+        Set<ActiveMQDestination> destinations = getDestinations();
+        for (ActiveMQDestination destination : destinations) {
+            if (destination.isQueue()) {
+                KahaReferenceStore store = (KahaReferenceStore)createQueueReferenceStore((ActiveMQQueue)destination);
                 store.addReferenceFileIdsInUse();
-            }else{
-                KahaTopicReferenceStore store=(KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
+            } else {
+                KahaTopicReferenceStore store = (KahaTopicReferenceStore)createTopicReferenceStore((ActiveMQTopic)destination);
                 store.addReferenceFileIdsInUse();
             }
         }
     }
 
-    protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName)
-            throws IOException{
-        Store store=getStore();
-        MapContainer<MessageId,ReferenceRecord> container=store.getMapContainer(id,containerName);
+    protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
+                                                                                String containerName)
+        throws IOException {
+        Store store = getStore();
+        MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName);
         container.setKeyMarshaller(new MessageIdMarshaller());
         container.setValueMarshaller(new ReferenceRecordMarshaller());
         container.load();
         return container;
     }
 
-    synchronized void addInterestInRecordFile(int recordNumber){
-        Integer key=Integer.valueOf(recordNumber);
-        AtomicInteger rr=recordReferences.get(key);
-        if(rr==null){
-            rr=new AtomicInteger();
-            recordReferences.put(key,rr);
+    synchronized void addInterestInRecordFile(int recordNumber) {
+        Integer key = Integer.valueOf(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr == null) {
+            rr = new AtomicInteger();
+            recordReferences.put(key, rr);
         }
         rr.incrementAndGet();
     }
 
-    synchronized void removeInterestInRecordFile(int recordNumber){
-        Integer key=Integer.valueOf(recordNumber);
-        AtomicInteger rr=recordReferences.get(key);
-        if(rr!=null&&rr.decrementAndGet()<=0){
+    synchronized void removeInterestInRecordFile(int recordNumber) {
+        Integer key = Integer.valueOf(recordNumber);
+        AtomicInteger rr = recordReferences.get(key);
+        if (rr != null && rr.decrementAndGet() <= 0) {
             recordReferences.remove(key);
         }
     }
@@ -188,99 +195,97 @@
      * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
      */
-    public Set<Integer> getReferenceFileIdsInUse() throws IOException{
+    public Set<Integer> getReferenceFileIdsInUse() throws IOException {
         return recordReferences.keySet();
     }
 
     /**
      * 
-     * @throws IOException 
+     * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
      */
-    public void clearMessages() throws IOException{
+    public void clearMessages() throws IOException {
         deleteAllMessages();
     }
 
     /**
      * 
-     * @throws IOException 
+     * @throws IOException
      * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
      */
-    public void recoverState() throws IOException{
-        for(Iterator i=durableSubscribers.iterator();i.hasNext();){
-            SubscriptionInfo info=(SubscriptionInfo)i.next();
-            TopicReferenceStore ts=createTopicReferenceStore((ActiveMQTopic)info.getDestination());
-            ts.addSubsciption(info,false);
+    public void recoverState() throws IOException {
+        for (Iterator i = durableSubscribers.iterator(); i.hasNext();) {
+            SubscriptionInfo info = (SubscriptionInfo)i.next();
+            TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
+            ts.addSubsciption(info, false);
         }
     }
 
-    public Map<TransactionId,AMQTx> retrievePreparedState() throws IOException{
-        Map<TransactionId,AMQTx> result=new HashMap<TransactionId,AMQTx>();
+    public Map<TransactionId, AMQTx> retrievePreparedState() throws IOException {
+        Map<TransactionId, AMQTx> result = new HashMap<TransactionId, AMQTx>();
         preparedTransactions.load();
-        for(Iterator i=preparedTransactions.keySet().iterator();i.hasNext();){
-            TransactionId key=(TransactionId)i.next();
-            AMQTx value=(AMQTx)preparedTransactions.get(key);
-            result.put(key,value);
+        for (Iterator i = preparedTransactions.keySet().iterator(); i.hasNext();) {
+            TransactionId key = (TransactionId)i.next();
+            AMQTx value = (AMQTx)preparedTransactions.get(key);
+            result.put(key, value);
         }
         return result;
     }
 
-    public void savePreparedState(Map<TransactionId,AMQTx> map) throws IOException{
+    public void savePreparedState(Map<TransactionId, AMQTx> map) throws IOException {
         preparedTransactions.clear();
-        for(Iterator<Map.Entry<TransactionId,AMQTx>> iter=map.entrySet().iterator();iter.hasNext();){
-            Map.Entry<TransactionId,AMQTx> entry=iter.next();
-            preparedTransactions.put(entry.getKey(),entry.getValue());
+        for (Iterator<Map.Entry<TransactionId, AMQTx>> iter = map.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry<TransactionId, AMQTx> entry = iter.next();
+            preparedTransactions.put(entry.getKey(), entry.getValue());
         }
     }
 
-    @Override public synchronized void setDirectory(File directory){
-        File file=new File(directory,"data");
+    @Override
+    public synchronized void setDirectory(File directory) {
+        File file = new File(directory, "data");
         super.setDirectory(file);
-        this.stateStore=createStateStore(directory);
+        this.stateStore = createStateStore(directory);
     }
 
-    protected synchronized Store getStateStore() throws IOException{
-        if(this.stateStore==null){
-            File stateDirectory=new File(getDirectory(),"kr-state");
+    protected synchronized Store getStateStore() throws IOException {
+        if (this.stateStore == null) {
+            File stateDirectory = new File(getDirectory(), "kr-state");
             stateDirectory.mkdirs();
-            this.stateStore=createStateStore(getDirectory());
+            this.stateStore = createStateStore(getDirectory());
         }
         return this.stateStore;
     }
 
-    public void deleteAllMessages() throws IOException{
-    	super.deleteAllMessages();
-        if(stateStore!=null){
-            if(stateStore.isInitialized()){
-            	stateStore.clear();
-            }else{
-            	stateStore.delete();
+    public void deleteAllMessages() throws IOException {
+        super.deleteAllMessages();
+        if (stateStore != null) {
+            if (stateStore.isInitialized()) {
+                stateStore.clear();
+            } else {
+                stateStore.delete();
             }
-        }else{
-            File stateDirectory=new File(getDirectory(),"kr-state");
+        } else {
+            File stateDirectory = new File(getDirectory(), "kr-state");
             StoreFactory.delete(stateDirectory.getAbsolutePath());
         }
     }
 
-    private Store createStateStore(File directory){
-        File stateDirectory=new File(directory,"state");
+    private Store createStateStore(File directory) {
+        File stateDirectory = new File(directory, "state");
         stateDirectory.mkdirs();
-        try{
-            return StoreFactory.open(stateDirectory.getAbsolutePath(),"rw");
-        }catch(IOException e){
-            log.error("Failed to create the state store",e);
+        try {
+            return StoreFactory.open(stateDirectory.getAbsolutePath(), "rw");
+        } catch (IOException e) {
+            log.error("Failed to create the state store", e);
         }
         return null;
     }
 
-    protected void addSubscriberState(SubscriptionInfo info) throws IOException{
+    protected void addSubscriberState(SubscriptionInfo info) throws IOException {
         durableSubscribers.add(info);
     }
 
-    protected void removeSubscriberState(SubscriptionInfo info){
+    protected void removeSubscriberState(SubscriptionInfo info) {
         durableSubscribers.remove(info);
     }
 }
-    
-	
-

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Wed Aug  8 11:56:59 2007
@@ -34,39 +34,40 @@
 /**
  * @version $Revision: 1.5 $
  */
-public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
+public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
 
     protected ListContainer<TopicSubAck> ackContainer;
     private Map subscriberContainer;
     private Store store;
-    protected Map subscriberMessages=new ConcurrentHashMap();
+    protected Map subscriberMessages = new ConcurrentHashMap();
 
-    public KahaTopicMessageStore(Store store,MapContainer messageContainer,ListContainer<TopicSubAck> ackContainer,
-            MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
-        super(messageContainer,destination);
-        this.store=store;
-        this.ackContainer=ackContainer;
-        subscriberContainer=subsContainer;
+    public KahaTopicMessageStore(Store store, MapContainer messageContainer,
+                                 ListContainer<TopicSubAck> ackContainer, MapContainer subsContainer,
+                                 ActiveMQDestination destination) throws IOException {
+        super(messageContainer, destination);
+        this.store = store;
+        this.ackContainer = ackContainer;
+        subscriberContainer = subsContainer;
         // load all the Ack containers
-        for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
-            Object key=i.next();
+        for (Iterator i = subscriberContainer.keySet().iterator(); i.hasNext();) {
+            Object key = i.next();
             addSubscriberMessageContainer(key);
         }
     }
 
     @Override
-    public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
-        int subscriberCount=subscriberMessages.size();
-        if(subscriberCount>0){
+    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
+        int subscriberCount = subscriberMessages.size();
+        if (subscriberCount > 0) {
             MessageId id = message.getMessageId();
-            StoreEntry messageEntry=messageContainer.place(id,message);
-            TopicSubAck tsa=new TopicSubAck();
+            StoreEntry messageEntry = messageContainer.place(id, message);
+            TopicSubAck tsa = new TopicSubAck();
             tsa.setCount(subscriberCount);
             tsa.setMessageEntry(messageEntry);
-            StoreEntry ackEntry=ackContainer.placeLast(tsa);
-            for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
-                TopicSubContainer container=(TopicSubContainer)i.next();
-                ConsumerMessageRef ref=new ConsumerMessageRef();
+            StoreEntry ackEntry = ackContainer.placeLast(tsa);
+            for (Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
+                TopicSubContainer container = (TopicSubContainer)i.next();
+                ConsumerMessageRef ref = new ConsumerMessageRef();
                 ref.setAckEntry(ackEntry);
                 ref.setMessageEntry(messageEntry);
                 ref.setMessageId(id);
@@ -75,75 +76,71 @@
         }
     }
 
-    public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
-            MessageId messageId) throws IOException{
-        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
-        if(container!=null){
-            ConsumerMessageRef ref=container.remove(messageId);
-            if(container.isEmpty()){
+    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
+                                         MessageId messageId) throws IOException {
+        String subcriberId = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(subcriberId);
+        if (container != null) {
+            ConsumerMessageRef ref = container.remove(messageId);
+            if (container.isEmpty()) {
                 container.reset();
             }
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
+            if (ref != null) {
+                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if (tsa != null) {
+                    if (tsa.decrementCount() <= 0) {
                         StoreEntry entry = ref.getAckEntry();
                         entry = ackContainer.refresh(entry);
                         ackContainer.remove(entry);
                         entry = tsa.getMessageEntry();
-                        entry =messageContainer.refresh(entry);
+                        entry = messageContainer.refresh(entry);
                         messageContainer.remove(entry);
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
+                    } else {
+                        ackContainer.update(ref.getAckEntry(), tsa);
                     }
                 }
             }
         }
     }
 
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
     }
 
-    public synchronized void addSubsciption(SubscriptionInfo info,boolean retroactive)
-            throws IOException{
-        String key=getSubscriptionKey(info.getClientId(),info.getSubscriptionName());
+    public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
+        String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
         // if already exists - won't add it again as it causes data files
         // to hang around
-        if(!subscriberContainer.containsKey(key)){
-            subscriberContainer.put(key,info);
+        if (!subscriberContainer.containsKey(key)) {
+            subscriberContainer.put(key, info);
         }
         // add the subscriber
-        ListContainer container=addSubscriberMessageContainer(key);
+        ListContainer container = addSubscriberMessageContainer(key);
         /*
-        if(retroactive){
-            for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
-                ConsumerMessageRef ref=new ConsumerMessageRef();
-                ref.setAckEntry(entry);
-                ref.setMessageEntry(tsa.getMessageEntry());
-                container.add(ref);
-            }
-        }
-        */
+         * if(retroactive){ for(StoreEntry
+         * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
+         * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
+         * ConsumerMessageRef ref=new ConsumerMessageRef();
+         * ref.setAckEntry(entry); ref.setMessageEntry(tsa.getMessageEntry());
+         * container.add(ref); } }
+         */
     }
 
-    public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriptionName);
+    public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        String key = getSubscriptionKey(clientId, subscriptionName);
         removeSubscriberMessageContainer(key);
     }
 
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-            throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            for(Iterator i=container.iterator();i.hasNext();){
-                ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-                Message msg=messageContainer.get(ref.getMessageEntry());
-                if(msg!=null){
-                    if(!recoverMessage(listener,msg)){
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
+        throws Exception {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        if (container != null) {
+            for (Iterator i = container.iterator(); i.hasNext();) {
+                ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+                Message msg = messageContainer.get(ref.getMessageEntry());
+                if (msg != null) {
+                    if (!recoverMessage(listener, msg)) {
                         break;
                     }
                 }
@@ -151,88 +148,89 @@
         }
     }
 
-	public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-            MessageRecoveryListener listener) throws Exception{
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        if(container!=null){
-            int count=0;
-            StoreEntry entry=container.getBatchEntry();
-            if(entry==null){
-                entry=container.getEntry();
-            }else{
-                entry=container.refreshEntry(entry);
-                if(entry!=null){
-                    entry=container.getNextEntry(entry);
+    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
+                                    MessageRecoveryListener listener) throws Exception {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        if (container != null) {
+            int count = 0;
+            StoreEntry entry = container.getBatchEntry();
+            if (entry == null) {
+                entry = container.getEntry();
+            } else {
+                entry = container.refreshEntry(entry);
+                if (entry != null) {
+                    entry = container.getNextEntry(entry);
                 }
             }
-            if(entry!=null){
-                do{
-                    ConsumerMessageRef consumerRef=container.get(entry);
-                    Message msg=messageContainer.getValue(consumerRef.getMessageEntry());
-                    if(msg!=null){
-                    	recoverMessage(listener, msg);
+            if (entry != null) {
+                do {
+                    ConsumerMessageRef consumerRef = container.get(entry);
+                    Message msg = messageContainer.getValue(consumerRef.getMessageEntry());
+                    if (msg != null) {
+                        recoverMessage(listener, msg);
                         count++;
-                        container.setBatchEntry(msg.getMessageId().toString(),entry);
-                    }else {
+                        container.setBatchEntry(msg.getMessageId().toString(), entry);
+                    } else {
                         container.reset();
                     }
-                    
-                    entry=container.getNextEntry(entry);
-                }while(entry!=null&&count<maxReturned&&listener.hasSpace());
+
+                    entry = container.getNextEntry(entry);
+                } while (entry != null && count < maxReturned && listener.hasSpace());
             }
         }
     }
 
-    public void delete(){
+    public void delete() {
         super.delete();
         ackContainer.clear();
         subscriberContainer.clear();
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
-        return (SubscriptionInfo[])subscriberContainer.values().toArray(new SubscriptionInfo[subscriberContainer.size()]);
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+        return (SubscriptionInfo[])subscriberContainer.values()
+            .toArray(new SubscriptionInfo[subscriberContainer.size()]);
     }
 
-    protected String getSubscriptionKey(String clientId,String subscriberName){
-        String result=clientId+":";
-        result+=subscriberName!=null?subscriberName:"NOT_SET";
+    protected String getSubscriptionKey(String clientId, String subscriberName) {
+        String result = clientId + ":";
+        result += subscriberName != null ? subscriberName : "NOT_SET";
         return result;
     }
 
-    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
-        ListContainer container=store.getListContainer(key,"topic-subs");
-        Marshaller marshaller=new ConsumerMessageRefMarshaller();
+    protected ListContainer addSubscriberMessageContainer(Object key) throws IOException {
+        ListContainer container = store.getListContainer(key, "topic-subs");
+        Marshaller marshaller = new ConsumerMessageRefMarshaller();
         container.setMarshaller(marshaller);
-        TopicSubContainer tsc=new TopicSubContainer(container);
-        subscriberMessages.put(key,tsc);
+        TopicSubContainer tsc = new TopicSubContainer(container);
+        subscriberMessages.put(key, tsc);
         return container;
     }
 
-    protected void removeSubscriberMessageContainer(Object key) throws IOException{
+    protected void removeSubscriberMessageContainer(Object key) throws IOException {
         subscriberContainer.remove(key);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
-        for(Iterator i=container.iterator();i.hasNext();){
-            ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
-            if(ref!=null){
-                TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
-                if(tsa!=null){
-                    if(tsa.decrementCount()<=0){
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.remove(key);
+        for (Iterator i = container.iterator(); i.hasNext();) {
+            ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
+            if (ref != null) {
+                TopicSubAck tsa = (TopicSubAck)ackContainer.get(ref.getAckEntry());
+                if (tsa != null) {
+                    if (tsa.decrementCount() <= 0) {
                         ackContainer.remove(ref.getAckEntry());
                         messageContainer.remove(tsa.getMessageEntry());
-                    }else{
-                        ackContainer.update(ref.getAckEntry(),tsa);
+                    } else {
+                        ackContainer.update(ref.getAckEntry(), tsa);
                     }
                 }
             }
         }
-        store.deleteListContainer(key,"topic-subs");
+        store.deleteListContainer(key, "topic-subs");
     }
 
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        String key=getSubscriptionKey(clientId,subscriberName);
-        TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
-        return  container != null ? container.size() : 0;
+    public int getMessageCount(String clientId, String subscriberName) throws IOException {
+        String key = getSubscriptionKey(clientId, subscriberName);
+        TopicSubContainer container = (TopicSubContainer)subscriberMessages.get(key);
+        return container != null ? container.size() : 0;
     }
 
     /**
@@ -240,19 +238,19 @@
      * @throws IOException
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
      */
-    public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
+    public synchronized void removeAllMessages(ConnectionContext context) throws IOException {
         messageContainer.clear();
         ackContainer.clear();
-        for(Iterator i=subscriberMessages.values().iterator();i.hasNext();){
-            TopicSubContainer container=(TopicSubContainer)i.next();
+        for (Iterator i = subscriberMessages.values().iterator(); i.hasNext();) {
+            TopicSubContainer container = (TopicSubContainer)i.next();
             container.clear();
         }
     }
 
-    public synchronized void resetBatching(String clientId,String subscriptionName){
-        String key=getSubscriptionKey(clientId,subscriptionName);
-        TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
-        if(topicSubContainer!=null){
+    public synchronized void resetBatching(String clientId, String subscriptionName) {
+        String key = getSubscriptionKey(clientId, subscriptionName);
+        TopicSubContainer topicSubContainer = (TopicSubContainer)subscriberMessages.get(key);
+        if (topicSubContainer != null) {
             topicSubContainer.reset();
         }
     }



Mime
View raw message