activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r589314 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/perf/
Date Sun, 28 Oct 2007 09:39:14 GMT
Author: rajdavies
Date: Sun Oct 28 02:39:12 2007
New Revision: 589314

URL: http://svn.apache.org/viewvc?rev=589314&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1479

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java   (with
props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Sun Oct 28 02:39:12 2007
@@ -352,7 +352,6 @@
     synchronized void addInterestInFile(DataFile dataFile) {
         if (dataFile != null) {
             dataFile.increment();
-            System.err.println("ADD INTEREST: " + dataFile);
         }
     }
 
@@ -370,24 +369,14 @@
             if (dataFile.decrement() <= 0) {
                 removeDataFile(dataFile);
             }
-            System.err.println("REMOVE INTEREST: " + dataFile);
         }
     }
 
-    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer
lastDataFile) throws IOException {
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress)
throws IOException {
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
-        
-        // Don't purge any data files past lastDataFile
-        if( lastDataFile!=null ) {
-            for (Iterator<Integer> iterator = unUsed.iterator(); iterator.hasNext();)
{
-                Integer i = iterator.next();
-                if( i >= lastDataFile ) {
-                    iterator.remove();
-                }
-            }
-        }
-        
+        unUsed.removeAll(inProgress);
+                
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
             DataFile dataFile = (DataFile)fileMap.get(key);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Sun Oct 28 02:39:12 2007
@@ -151,6 +151,7 @@
         synchronized (this) {
             lastLocation = location;
             messages.put(message.getMessageId(), data);
+            this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
         }
         try {
             asyncWriteTask.wakeup();
@@ -338,6 +339,7 @@
                     Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
                         referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
+                        AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,entry.getValue().getFileId());
                     } catch (Throwable e) {
                         LOG.warn("Message could not be added to long term store: " + e.getMessage(),
e);
                     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Sun Oct 28 02:39:12 2007
@@ -21,12 +21,15 @@
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activeio.journal.Journal;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
@@ -55,9 +58,9 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -65,6 +68,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+
 /**
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -102,6 +106,7 @@
     private boolean persistentIndex=true;
     private boolean useNio = true;
     private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
+    private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>>
();
 
 
     public String getBrokerName() {
@@ -353,9 +358,14 @@
         try {
             // Capture the lastDataFile so that we don't delete any data files
             // after this one.
-            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();            
+            Set<Integer>inProgress = new CopyOnWriteArraySet<Integer>();
+            for (Set<Integer> set: dataFilesInProgress.values()) {
+                inProgress.addAll(set);
+            }
+            Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
+            inProgress.add(lastDataFile);
             Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
-            asyncDataManager.consolidateDataFilesNotIn(inUse, lastDataFile);
+            asyncDataManager.consolidateDataFilesNotIn(inUse, inProgress);
         } catch (IOException e) {
             LOG.error("Could not cleanup data files: " + e, e);
         }
@@ -730,4 +740,20 @@
 	public void setMaxFileLength(int maxFileLength) {
 		this.maxFileLength = maxFileLength;
 	}
+	
+	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
+	    Set<Integer>set = dataFilesInProgress.get(store);
+	    if (set == null) {
+	        set = new CopyOnWriteArraySet<Integer>();
+	        dataFilesInProgress.put(store, set);
+	    }
+	    set.add(dataFileId);
+	}
+	
+	protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
+        Set<Integer>set = dataFilesInProgress.get(store);
+        if (set != null) {
+            set.remove(dataFileId);
+        }
+    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java?rev=589314&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java Sun Oct
28 02:39:12 2007
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * A Simple LRU Set
+ * 
+ * @version $Revision$
+ * @param <K>
+ * @param <V>
+ */
+
+public class LRUSet<E>
+extends AbstractSet<E>
+implements Set<E>, Cloneable, java.io.Serializable{
+   
+    private static final Object IGNORE = new Object();
+   
+    private final LRUCache cache;
+
+    /**
+     * Default constructor for an LRU Cache The default capacity is 10000
+     */
+    public LRUSet() {
+        this(0,10000, 0.75f, true);
+    }
+
+    /**
+     * Constructs a LRUCache with a maximum capacity
+     * 
+     * @param maximumCacheSize
+     */
+    public LRUSet(int maximumCacheSize) {
+        this(0, maximumCacheSize, 0.75f, true);
+    }
+
+    /**
+     * Constructs an empty <tt>LRUCache</tt> instance with the specified
+     * initial capacity, maximumCacheSize,load factor and ordering mode.
+     * 
+     * @param initialCapacity
+     *            the initial capacity.
+     * @param maximumCacheSize
+     * @param loadFactor
+     *            the load factor.
+     * @param accessOrder
+     *            the ordering mode - <tt>true</tt> for access-order,
+     *            <tt>false</tt> for insertion-order.
+     * @throws IllegalArgumentException
+     *             if the initial capacity is negative or the load factor is
+     *             non-positive.
+     */
+
+    public LRUSet(int initialCapacity, int maximumCacheSize, float loadFactor, boolean accessOrder)
{
+        this.cache = new LRUCache<E,Object>(initialCapacity,maximumCacheSize,loadFactor,accessOrder);
+    }
+
+   
+    public Iterator<E> iterator() {
+    return cache.keySet().iterator();
+    }
+
+   
+    public int size() {
+    return cache.size();
+    }
+
+   
+    public boolean isEmpty() {
+    return cache.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+    return cache.containsKey(o);
+    }
+
+   
+    public boolean add(E o) {
+    return cache.put(o, IGNORE)==null;
+    }
+
+    public boolean remove(Object o) {
+    return cache.remove(o)==IGNORE;
+    }
+
+    
+    public void clear() {
+    cache.clear();
+    }
+
+    
+
+    
+      
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LRUSet.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreDurableTopicTest.java
Sun Oct 28 02:39:12 2007
@@ -27,10 +27,10 @@
 
     protected void configureBroker(BrokerService answer) throws Exception {
         File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
+        answer.setDeleteAllMessagesOnStartup(true);
         AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
         adaptor.setDirectory(dataFileDir);
         answer.setPersistenceAdapter(adaptor);
-        answer.setDeleteAllMessagesOnStartup(true);
         answer.addConnector(bindAddress);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/AMQStoreQueueTest.java
Sun Oct 28 02:39:12 2007
@@ -35,7 +35,7 @@
 
         answer.setPersistenceAdapter(adaptor);
         answer.addConnector(bindAddress);
-        // answer.setDeleteAllMessagesOnStartup(true);
+        answer.setDeleteAllMessagesOnStartup(true);
 
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java?rev=589314&r1=589313&r2=589314&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Sun Oct 28 02:39:12 2007
@@ -37,14 +37,14 @@
     protected BrokerService broker;
     // protected String
     // bindAddress="tcp://localhost:61616?wireFormat.cacheEnabled=true&wireFormat.tightEncodingEnabled=true&jms.useAsyncSend=false";
-    //protected String bindAddress="tcp://localhost:61616";
+    protected String bindAddress="tcp://localhost:61616";
     //protected String bindAddress = "tcp://localhost:61616";
     //protected String bindAddress="vm://localhost?marshal=true";
-    protected String bindAddress="vm://localhost";
+    //protected String bindAddress="vm://localhost";
     protected PerfProducer[] producers;
     protected PerfConsumer[] consumers;
     protected String destinationName = getClass().getName();
-    protected int samepleCount = 10;
+    protected int samepleCount = 20;
     protected long sampleInternal = 10000;
     protected int numberOfConsumers = 1;
     protected int numberofProducers = 2;



Mime
View raw message