activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r691621 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/amq/ test/java/org/apache/activemq/bugs/
Date Wed, 03 Sep 2008 14:14:38 GMT
Author: gtully
Date: Wed Sep  3 07:14:38 2008
New Revision: 691621

URL: http://svn.apache.org/viewvc?rev=691621&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1926 with test case

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Wed Sep  3 07:14:38 2008
@@ -434,6 +434,9 @@
                 purgeList.add(dataFile);
         	}
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ")
" + purgeList);
+        }
         for (DataFile dataFile : purgeList) {
             forceRemoveDataFile(dataFile);
         }
@@ -465,17 +468,17 @@
             throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
         fileByFileMap.remove(dataFile.getFile());
-        DataFile removed = fileMap.remove(dataFile.getDataFileId());
         storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
         if (archiveDataLogs) {
             dataFile.move(getDirectoryArchive());
-            LOG.info("moved data file " + dataFile + " to "
+            LOG.debug("moved data file " + dataFile + " to "
                     + getDirectoryArchive());
         } else {
             boolean result = dataFile.delete();
-            LOG.info("discarding data file " + dataFile
-                    + (result ? "successful " : "failed"));
+            if (!result) {
+                LOG.info("Failed to discard data file " + dataFile);
+            }
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Wed Sep  3 07:14:38 2008
@@ -261,6 +261,9 @@
             data = messages.remove(id);
             if (data == null) {
                 messageAcks.add(ack);
+            } else {
+                // message never got written so datafileReference will still exist
+                AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this,
data.getFileId());
             }
         }finally {
             lock.unlock();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
Wed Sep  3 07:14:38 2008
@@ -26,9 +26,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activeio.journal.Journal;
@@ -122,7 +122,7 @@
     private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
     private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
     private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
-    private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>>
();
+    private Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress
= new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
     private String directoryPath = "";
     private RandomAccessFile lockFile;
     private FileLock lock;
@@ -271,14 +271,14 @@
                 checkpoint(false);
             }
         };
-        Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
+        Scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
         periodicCleanupTask = new Runnable() {
 
             public void run() {
                 cleanup();
             }
         };
-        Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
+        Scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
         
         if (lockAquired && lockLogged) {
             LOG.info("Aquired lock for AMQ Store" + getDirectory());
@@ -426,8 +426,11 @@
     public void cleanup() {
         try {
             Set<Integer>inProgress = new HashSet<Integer>();
-            for (Set<Integer> set: dataFilesInProgress.values()) {
-                inProgress.addAll(set);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size()
+ ") " + dataFilesInProgress.values());
+            }      
+            for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
+                inProgress.addAll(set.keySet());
             }
             Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
             inProgress.add(lastDataFile);
@@ -437,6 +440,7 @@
             if (lastActiveTx != null) {
                 lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
             }
+            LOG.debug("lastDataFile: " + lastDataFile);
             asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
         } catch (IOException e) {
             LOG.error("Could not cleanup data files: " + e, e);
@@ -967,18 +971,32 @@
 
 	
 	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
-	    Set<Integer>set = dataFilesInProgress.get(store);
-	    if (set == null) {
-	        set = new CopyOnWriteArraySet<Integer>();
-	        dataFilesInProgress.put(store, set);
+	    Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
+	    if (map == null) {
+	        map = new ConcurrentHashMap<Integer, AtomicInteger>();
+	        dataFilesInProgress.put(store, map);
+	    }
+	    AtomicInteger count = map.get(dataFileId);
+	    if (count == null) {
+	        count = new AtomicInteger(0);
+	        map.put(dataFileId, count);
 	    }
-	    set.add(dataFileId);
+	    count.incrementAndGet();
 	}
 	
 	protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
-        Set<Integer>set = dataFilesInProgress.get(store);
-        if (set != null) {
-            set.remove(dataFileId);
+        Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
+        if (map != null) {
+            AtomicInteger count = map.get(dataFileId);
+            if (count != null) {
+                int newCount = count.decrementAndGet(); 
+                if (newCount <=0) {
+                    map.remove(dataFileId);
+                }
+            }
+            if (map.isEmpty()) {
+                dataFilesInProgress.remove(store);
+            }
         }
     }
 	

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java?rev=691621&r1=691620&r2=691621&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapterFactory.java
Wed Sep  3 07:14:38 2008
@@ -72,6 +72,7 @@
         result.setUseNio(isUseNio());
         result.setMaxFileLength(getMaxFileLength());
         result.setCleanupInterval(getCleanupInterval());
+        result.setCheckpointInterval(getCheckpointInterval());
         result.setIndexBinSize(getIndexBinSize());
         result.setIndexKeySize(getIndexKeySize());
         result.setIndexPageSize(getIndexPageSize());

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java?rev=691621&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
Wed Sep  3 07:14:38 2008
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/*
+ * see https://issues.apache.org/activemq/browse/AMQ-1926
+ */
+public class DataFileNotDeletedTest extends TestCase {
+
+    private static final Log LOG = LogFactory.getLog(DataFileNotDeletedTest.class);
+
+    private final CountDownLatch latch = new CountDownLatch(max_messages);
+    private static int max_messages = 600;
+    private static int messageCounter;
+    private String destinationName = getName()+"_Queue";
+    private BrokerService broker;
+    private Connection receiverConnection;
+    private Connection producerConnection;
+    final boolean useTopic = false;
+    
+    AMQPersistenceAdapter persistentAdapter;
+    protected static final String payload = new String(new byte[512]);
+
+    public void setUp() throws Exception {
+        messageCounter = 0;
+        startBroker();
+        receiverConnection = createConnection();
+        receiverConnection.start();
+        producerConnection = createConnection();
+        producerConnection.start();
+    }
+    
+    public void tearDown() throws Exception {
+        receiverConnection.close();
+        producerConnection.close();
+        broker.stop();
+    }
+
+    public void testForDataFileNotDeleted() throws Exception {
+        doTestForDataFileNotDeleted(false);
+    }
+    
+    public void testForDataFileNotDeletedTransacted() throws Exception {
+        doTestForDataFileNotDeleted(true);
+    }
+    
+    private void doTestForDataFileNotDeleted(boolean transacted) throws Exception {
+        
+        Receiver receiver = new Receiver() {
+            public void receive(String s) throws Exception {
+                messageCounter++; 
+                latch.countDown();
+            }
+        };
+        buildReceiver(receiverConnection, destinationName, transacted, receiver, useTopic);
+
+        final MessageSender producer = new MessageSender(destinationName, producerConnection,
transacted, useTopic);
+        for (int i=0; i< max_messages; i++) {
+            producer.send(payload );
+        }
+        latch.await();
+        assertEquals(max_messages, messageCounter);
+        waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 30000, 2);

+    }
+
+    private void waitFordataFilesToBeCleanedUp(
+            AsyncDataManager asyncDataManager, int timeout, int numExpected) throws InterruptedException
{
+        long expiry = System.currentTimeMillis()  + timeout;
+        while(expiry > System.currentTimeMillis()) {
+            if (asyncDataManager.getFiles().size() <= numExpected) {
+                break;
+            } else {
+                Thread.sleep(1000);
+            }
+        }
+        assertEquals("persistence adapter dataManager has correct number of files", 2, asyncDataManager.getFiles().size());
+    }
+
+    private Connection createConnection() throws JMSException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+        return factory.createConnection();
+    }
+
+    private void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:61616").setName("Default");
+           
+        AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
+        // ensure there are a bunch of data files but multiple entries in each
+        factory.setMaxFileLength(1024 * 20);
+        // speed up the test case, checkpoint an cleanup early and often
+        factory.setCheckpointInterval(500);
+        factory.setCleanupInterval(500);
+        factory.setSyncOnWrite(false);
+        
+        persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter();
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    private void buildReceiver(Connection connection, final String queueName, boolean transacted,
final Receiver receiver, boolean isTopic) throws Exception {
+        final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED)
: connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName)
: session.createQueue(queueName));
+        MessageListener messageListener = new MessageListener() {
+
+            public void onMessage(Message message) {
+                try {
+                    ObjectMessage objectMessage = (ObjectMessage)message;
+                    String s = (String)objectMessage.getObject();
+                    receiver.receive(s);
+                    if (session.getTransacted()) {
+                        session.commit();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        inputMessageConsumer.setMessageListener(messageListener);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message