activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/5] activemq-6 git commit: Fixing when the storage is stopped for the storage manager
Date Fri, 27 Feb 2015 10:42:38 GMT
Repository: activemq-6
Updated Branches:
  refs/heads/master ba1e685b6 -> a895bf334


Fixing when the storage is stopped for the storage manager

In some cases the ID Generator will be called after the JournalStorage was stopped.
AS a result you could have cases where the ID generator is called and the journal storage
is stopped.

Also I added some check to IDs and added some code to cleanup old IDS on the BatchIDManager


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f896a394
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f896a394
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f896a394

Branch: refs/heads/master
Commit: f896a394e92dbfdd7859f930468494ef323c0f92
Parents: ba1e685
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Feb 26 15:12:36 2015 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Feb 26 22:52:27 2015 -0500

----------------------------------------------------------------------
 .../core/persistence/StorageManager.java        |   9 +-
 .../impl/journal/BatchingIDGenerator.java       |  65 +++++++++++-
 .../impl/journal/JournalStorageManager.java     |  18 ++++
 .../impl/nullpm/NullStorageManager.java         |   6 ++
 .../tests/logging/AssertionLoggerHandler.java   |  23 +++--
 .../activemq/tests/util/ServiceTestBase.java    |  55 +++++++---
 .../integration/client/HangConsumerTest.java    |  68 +++++++-----
 .../integration/server/SimpleStartStopTest.java | 103 +++++++++++++++++++
 .../impl/BatchIDGeneratorUnitTest.java          |  16 ++-
 9 files changed, 309 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java
b/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java
index c074ce2..e07fe06 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java
@@ -380,7 +380,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
    void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws
Exception;
 
    /**
-    * Stores the given journalID in the bindingsJournal.
+    * Stores the id from IDManager.
     *
     * @param journalID
     * @param id
@@ -388,6 +388,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
     */
    void storeID(long journalID, long id) throws Exception;
 
+
+   /*
+       Deletes the ID from IDManager.
+    */
+   void deleteID(long journalD) throws Exception;
+
+
    /**
     * Read lock the StorageManager. USE WITH CARE!
     * <p/>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java
b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java
index c2a9040..9c8bc21 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.core.persistence.impl.journal;
 
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.api.core.ActiveMQBuffer;
@@ -42,6 +45,8 @@ public final class BatchingIDGenerator implements IDGenerator
 
    private final StorageManager storageManager;
 
+   private List<Long> cleanupRecords = null;
+
    public BatchingIDGenerator(final long start, final long checkpointSize, final StorageManager
storageManager)
    {
       counter = new AtomicLong(start);
@@ -60,8 +65,31 @@ public final class BatchingIDGenerator implements IDGenerator
       storeID(recordID, recordID);
    }
 
+   /**
+    * A method to cleanup old records after started
+    */
+   public void cleanup()
+   {
+      if (cleanupRecords != null)
+      {
+         Iterator<Long> iterRecord = cleanupRecords.iterator();
+         while (iterRecord.hasNext())
+         {
+            Long record = iterRecord.next();
+            if (iterRecord.hasNext())
+            {
+               // we don't want to remove the last record
+               deleteID(record.longValue());
+            }
+         }
+         cleanupRecords.clear(); // help GC
+         cleanupRecords = null;
+      }
+   }
+
    public void loadState(final long journalID, final ActiveMQBuffer buffer)
    {
+      addCleanupRecord(journalID);
       IDCounterEncoding encoding = new IDCounterEncoding();
 
       encoding.decode(buffer);
@@ -93,10 +121,33 @@ public final class BatchingIDGenerator implements IDGenerator
       if (id >= nextID)
       {
          nextID += checkpointSize;
-         storeID(counter.incrementAndGet(), nextID);
+
+         if (!storageManager.isStarted())
+         {
+            // This could happen after the server is stopped
+            // while notifications are being sent and ID gerated.
+            // If the ID is intended to the journal you would know soon enough
+            // so we just ignore this for now
+            ActiveMQServerLogger.LOGGER.debug("The journalStorageManager is not loaded. "
+
+                                                 "This is probably ok as long as it's a notification
being sent after shutdown");
+         }
+         else
+         {
+            storeID(counter.getAndIncrement(), nextID);
+         }
       }
    }
 
+   private void addCleanupRecord(long id)
+   {
+      if (cleanupRecords == null)
+      {
+         cleanupRecords = new LinkedList<>();
+      }
+
+      cleanupRecords.add(id);
+   }
+
    private void storeID(final long journalID, final long id)
    {
       try
@@ -109,6 +160,18 @@ public final class BatchingIDGenerator implements IDGenerator
       }
    }
 
+   private void deleteID(final long journalID)
+   {
+      try
+      {
+         storageManager.deleteID(journalID);
+      }
+      catch (Exception e)
+      {
+         ActiveMQServerLogger.LOGGER.batchingIdError(e);
+      }
+   }
+
    public static EncodingSupport createIDEncodingSupport(final long id)
    {
       return new IDCounterEncoding(id);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java
b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java
index 82b135c..78570fe 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java
@@ -1488,6 +1488,21 @@ public class JournalStorageManager implements StorageManager
       }
    }
 
+   @Override
+   public void deleteID(long journalD) throws Exception
+   {
+      readLock();
+      try
+      {
+         bindingsJournal.appendDeleteRecord(journalD, false);
+      }
+      finally
+      {
+         readUnLock();
+      }
+   }
+
+
    public void deleteAddressSetting(SimpleString addressMatch) throws Exception
    {
       PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
@@ -2200,6 +2215,9 @@ public class JournalStorageManager implements StorageManager
          }
       }
 
+      // This will instruct the IDGenerator to cleanup old records
+      idGenerator.cleanup();
+
       return bindingsInfo;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java
b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java
index 4fc0c02..b9b04c9 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java
@@ -631,4 +631,10 @@ public class NullStorageManager implements StorageManager
    {
       // no-op
    }
+
+   @Override
+   public void deleteID(long journalD) throws Exception
+   {
+
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java
b/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java
index c819f64..8188495 100644
--- a/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java
+++ b/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java
@@ -59,20 +59,23 @@ public class AssertionLoggerHandler extends ExtHandler
       }
    }
 
-   public static void assertMessageWasLogged(String assertionMessage, String expectedMessage)
-   {
-      if (!messages.containsKey(expectedMessage))
-      {
-         throw new AssertionError(assertionMessage);
-      }
-   }
 
-   public static void assertMessageWasLogged(String message)
+   /**
+    * is there any record matching Level?
+     * @param level
+    * @return
+    */
+   public static boolean hasLevel(Level level)
    {
-      if (!messages.containsKey(message))
+      for (ExtLogRecord record : messages.values())
       {
-         throw new AssertionError(Arrays.toString(messages.keySet().toArray()));
+         if (record.getLevel().equals(level))
+         {
+            return true;
+         }
       }
+
+      return false;
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java
b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java
index dd6b945..f6ffbc7 100644
--- a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java
+++ b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java
@@ -1000,25 +1000,56 @@ public abstract class ServiceTestBase extends UnitTestCase
     */
    protected HashMap<Integer, AtomicInteger> countJournalLivingRecords(Configuration
config) throws Exception
    {
+      return internalCountJournalLivingRecords(config, true);
+   }
+
+   /**
+    * This method will load a journal and count the living records
+    *
+    * @param config
+    * @param messageJournal if true -> MessageJournal, false -> BindingsJournal
+    * @return
+    * @throws Exception
+    */
+   protected HashMap<Integer, AtomicInteger> internalCountJournalLivingRecords(Configuration
config, boolean messageJournal) throws Exception
+   {
       final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
-      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null);
+      SequentialFileFactory ff;
 
-      JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(),
-                                                    config.getJournalMinFiles(),
-                                                    0,
-                                                    0,
-                                                    messagesFF,
-                                                    "activemq-data",
-                                                    "amq",
-                                                    1);
-      messagesJournal.start();
+      JournalImpl journal;
+
+      if (messageJournal)
+      {
+         ff = new NIOSequentialFileFactory(getJournalDir(), null);
+         journal = new JournalImpl(config.getJournalFileSize(),
+                                   config.getJournalMinFiles(),
+                                   0,
+                                   0,
+                                   ff,
+                                   "activemq-data",
+                                   "amq",
+                                   1);
+      }
+      else
+      {
+         ff = new NIOSequentialFileFactory(getBindingsDir(), null);
+         journal = new JournalImpl(1024 * 1024,
+                                   2,
+                                   config.getJournalCompactMinFiles(),
+                                   config.getJournalCompactPercentage(),
+                                   ff,
+                                   "activemq-bindings",
+                                   "bindings",
+                                   1);
+      }
+      journal.start();
 
 
       final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>();
       final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
 
 
-      messagesJournal.load(committedRecords, preparedTransactions, null, false);
+      journal.load(committedRecords, preparedTransactions, null, false);
 
       for (RecordInfo info : committedRecords)
       {
@@ -1033,7 +1064,7 @@ public abstract class ServiceTestBase extends UnitTestCase
 
       }
 
-      messagesJournal.stop();
+      journal.stop();
       return recordsType;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
index 071e782..fb56440 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
@@ -477,45 +477,59 @@ public class HangConsumerTest extends ServiceTestBase
    @Test
    public void testDuplicateDestinationsOnTopic() throws Exception
    {
-      for (int i = 0; i < 5; i++)
+      try
       {
-         if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null)
+         for (int i = 0; i < 5; i++)
          {
-            server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"),
SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
-         }
+            if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null)
+            {
+               server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"),
SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
+            }
 
-         server.stop();
+            server.stop();
 
-         SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(),
null);
+            SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(),
null);
 
-         JournalImpl messagesJournal = new JournalImpl(1024 * 1024,
-            2,
-            0,
-            0,
-            messagesFF,
-            "activemq-bindings",
-            "bindings",
-            1);
+            JournalImpl messagesJournal = new JournalImpl(1024 * 1024,
+                                                          2,
+                                                          0,
+                                                          0,
+                                                          messagesFF,
+                                                          "activemq-bindings",
+                                                          "bindings",
+                                                          1);
 
-         messagesJournal.start();
+            messagesJournal.start();
 
-         LinkedList<RecordInfo> infos = new LinkedList<RecordInfo>();
+            LinkedList<RecordInfo> infos = new LinkedList<RecordInfo>();
 
-         messagesJournal.load(infos, null, null);
+            messagesJournal.load(infos, null, null);
 
-         int bindings = 0;
-         for (RecordInfo info: infos)
-         {
-            if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD)
+            int bindings = 0;
+            for (RecordInfo info : infos)
             {
-               bindings++;
+               System.out.println("info: " + info);
+               if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD)
+               {
+                  bindings++;
+               }
             }
-         }
-         assertEquals(1, bindings);
+            assertEquals(1, bindings);
 
-         System.out.println("Bindings: " + bindings);
-         messagesJournal.stop();
-         if (i < 4) server.start();
+            System.out.println("Bindings: " + bindings);
+            messagesJournal.stop();
+            if (i < 4) server.start();
+         }
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java
new file mode 100644
index 0000000..265cb16
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.tests.integration.server;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.tests.logging.AssertionLoggerHandler;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.logmanager.Level;
+import org.junit.Test;
+
+/**
+ * @author clebertsuconic
+ */
+
+public class SimpleStartStopTest extends ServiceTestBase
+{
+
+   /**
+    * Start / stopping the server shouldn't generate any errors.
+    * Also it shouldn't bloat the journal with lots of IDs (it should do some cleanup when
possible)
+    * <p/>
+    * This is also validating that the same server could be restarted after stopped
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testStartStopAndCleanupIDs() throws Exception
+   {
+      AssertionLoggerHandler.clear();
+      AssertionLoggerHandler.startCapture();
+      try
+      {
+         ActiveMQServer server = null;
+
+         for (int i = 0; i < 50; i++)
+         {
+            server = createServer(true, false);
+            server.start();
+            server.stop(false);
+         }
+
+         // There shouldn't be any error from starting / stopping the server
+         assertFalse("There shouldn't be any error for just starting / stopping the server",
+                     AssertionLoggerHandler.hasLevel(Level.ERROR));
+         assertFalse(AssertionLoggerHandler.findText("AMQ224008"));
+
+
+         HashMap<Integer, AtomicInteger> records = this.internalCountJournalLivingRecords(server.getConfiguration(),
false);
+
+
+         AtomicInteger recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
+
+         assertNotNull(recordCount);
+
+         // The server should remove old IDs from the journal
+         assertTrue("The server should cleanup after IDs on the bindings record. It left
" + recordCount +
+                       " ids on the journal", recordCount.intValue() < 5);
+
+         System.out.println("RecordCount::" + recordCount);
+
+
+         server.start();
+
+
+         records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
+
+
+         recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
+
+         assertNotNull(recordCount);
+
+         System.out.println("Record count with server started: " + recordCount);
+
+         assertTrue("If this is zero it means we are removing too many records", recordCount.intValue()
!= 0);
+
+         server.stop();
+
+      }
+      finally
+      {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f896a394/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
index adcc28f..c7fd23c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
@@ -46,7 +46,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
    public void testSequence() throws Exception
    {
       NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir());
-      Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "test-data", "tst",
1);
+      Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings",
"bindings", 1);
 
       journal.start();
 
@@ -135,7 +135,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
 
       Assert.assertEquals(0, tx.size());
 
-      Assert.assertTrue(records.size() > 0);
+      Assert.assertTrue("Contains " + records.size(),  records.size() > 0);
 
       for (RecordInfo record : records)
       {
@@ -149,7 +149,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
 
    private StorageManager getJournalStorageManager(final Journal bindingsJournal)
    {
-      return new NullStorageManager()
+      NullStorageManager storageManager = new NullStorageManager()
       {
          @Override
          public synchronized void storeID(long journalID, long id) throws Exception
@@ -158,5 +158,15 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
                                             BatchingIDGenerator.createIDEncodingSupport(id),
true);
          }
       };
+
+      try
+      {
+         storageManager.start();
+      }
+      catch (Throwable ignored)
+      {
+      }
+
+      return storageManager;
    }
 }


Mime
View raw message