activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/3] activemq-artemis git commit: ARTEMIS-1442 Shutdown server if can't move file on journal
Date Fri, 29 Sep 2017 15:35:56 GMT
ARTEMIS-1442 Shutdown server if can't move file on journal


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/aa3e8941
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/aa3e8941
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/aa3e8941

Branch: refs/heads/master
Commit: aa3e8941d12e6b06c7dc547f48b3e30c1e94606b
Parents: 178d403
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Thu Sep 28 21:06:21 2017 -0400
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Sep 29 11:35:47 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/journal/impl/JournalImpl.java  |  63 +++++++--
 .../impl/journal/JournalStorageManager.java     |  20 ++-
 .../ShutdownOnCriticalIOErrorMoveNext.java      | 141 +++++++++++++++++++
 3 files changed, 209 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aa3e8941/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 1b7ba26..7b6f48d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.journal.impl;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
@@ -49,6 +50,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
@@ -175,6 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
    // Compacting may replace this structure
    private final ConcurrentLongHashMap<JournalTransaction> transactions = new ConcurrentLongHashMap<>();
 
+   private final IOCriticalErrorListener criticalErrorListener;
+
+
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
 
@@ -265,6 +270,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
       this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage,
5, fileFactory, filePrefix, fileExtension, maxAIO, userVersion);
    }
 
+
    public JournalImpl(final ExecutorFactory ioExecutors,
                       final int fileSize,
                       final int minFiles,
@@ -277,9 +283,28 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
                       final String fileExtension,
                       final int maxAIO,
                       final int userVersion) {
+      this(ioExecutors, fileSize, minFiles, poolSize, compactMinFiles, compactPercentage,
journalFileOpenTimeout, fileFactory, filePrefix, fileExtension, maxAIO, userVersion, null);
+   }
+
+
+   public JournalImpl(final ExecutorFactory ioExecutors,
+                      final int fileSize,
+                      final int minFiles,
+                      final int poolSize,
+                      final int compactMinFiles,
+                      final int compactPercentage,
+                      final int journalFileOpenTimeout,
+                      final SequentialFileFactory fileFactory,
+                      final String filePrefix,
+                      final String fileExtension,
+                      final int maxAIO,
+                      final int userVersion,
+                      IOCriticalErrorListener criticalErrorListener) {
 
       super(fileFactory.isSupportsCallbacks(), fileSize);
 
+      this.criticalErrorListener = criticalErrorListener;
+
       this.providedIOThreadPool = ioExecutors;
 
       if (fileSize % fileFactory.getAlignment() != 0) {
@@ -2910,23 +2935,45 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
     * @throws Exception
     */
    protected JournalFile switchFileIfNecessary(int size) throws Exception {
+
       // We take into account the fileID used on the Header
       if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
{
          throw new IllegalArgumentException("Record is too large to store " + size);
       }
 
-      if (!currentFile.getFile().fits(size)) {
-         moveNextFile(true);
-
-         // The same check needs to be done at the new file also
+      try {
          if (!currentFile.getFile().fits(size)) {
-            // Sanity check, this should never happen
-            throw new IllegalStateException("Invalid logic on buffer allocation");
+            moveNextFile(true);
+
+            // The same check needs to be done at the new file also
+            if (!currentFile.getFile().fits(size)) {
+               // Sanity check, this should never happen
+               throw new IllegalStateException("Invalid logic on buffer allocation");
+            }
          }
+         return currentFile;
+      } catch (Throwable e) {
+         criticalIO(e);
+         return null; // this will never happen, the method will call throw
       }
-      return currentFile;
    }
 
+   private void criticalIO(Throwable e) throws Exception {
+      if (criticalErrorListener != null) {
+         criticalErrorListener.onIOException(e, e.getMessage(), currentFile == null ? null
: currentFile.getFile());
+      }
+      if (e instanceof Exception) {
+         throw (Exception) e;
+      } else if (e instanceof IllegalStateException) {
+         throw (IllegalStateException) e;
+      } else {
+         IOException ioex = new IOException();
+         ioex.initCause(e);
+         throw ioex;
+      }
+   }
+
+
    private CountDownLatch newLatch(int countDown) {
       if (state == JournalState.STOPPED) {
          throw new RuntimeException("Server is not started");
@@ -2956,7 +3003,7 @@ public class JournalImpl extends JournalBase implements TestableJournal,
Journal
    /**
     * You need to guarantee lock.acquire() before calling this method!
     */
-   private void moveNextFile(final boolean scheduleReclaim) throws Exception {
+   protected void moveNextFile(final boolean scheduleReclaim) throws Exception {
       filesRepository.closeFile(currentFile);
 
       currentFile = filesRepository.openFile();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aa3e8941/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 0bd4df3..b0dc10b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -71,19 +71,19 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
 
    private static final Logger logger = Logger.getLogger(JournalStorageManager.class);
 
-   private SequentialFileFactory journalFF;
+   protected SequentialFileFactory journalFF;
 
-   private SequentialFileFactory bindingsFF;
+   protected SequentialFileFactory bindingsFF;
 
    SequentialFileFactory largeMessagesFactory;
 
-   private Journal originalMessageJournal;
+   protected Journal originalMessageJournal;
 
-   private Journal originalBindingsJournal;
+   protected Journal originalBindingsJournal;
 
    protected String largeMessagesDirectory;
 
-   private ReplicationManager replicator;
+   protected ReplicationManager replicator;
 
    public JournalStorageManager(final Configuration config,
                                 final CriticalAnalyzer analyzer,
@@ -124,7 +124,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
       bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener,
config.getJournalMaxIO_NIO());
       bindingsFF.setDatasync(config.isJournalDatasync());
 
-      Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(),
bindingsFF, "activemq-bindings", "bindings", 1, 0);
+      Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(),
bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
 
       bindingsJournal = localBindings;
       originalBindingsJournal = localBindings;
@@ -160,7 +160,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
          fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
          ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(),
fileSize, journalFF.getAlignment());
       }
-      Journal localMessage = new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(),
journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0);
+      Journal localMessage = createMessageJournal(config, criticalErrorListener, fileSize);
 
       messageJournal = localMessage;
       originalMessageJournal = localMessage;
@@ -176,6 +176,12 @@ public class JournalStorageManager extends AbstractJournalStorageManager
{
       }
    }
 
+   protected Journal createMessageJournal(Configuration config,
+                                        IOCriticalErrorListener criticalErrorListener,
+                                        int fileSize) {
+      return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(),
config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(),
journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
+   }
+
    // Life Cycle Handlers
    @Override
    protected void beforeStart() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/aa3e8941/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNext.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNext.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNext.java
new file mode 100644
index 0000000..eb42856
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNext.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.critical;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ShutdownOnCriticalIOErrorMoveNext extends ActiveMQTestBase {
+
+   @Test
+   public void testSimplyDownAfterError() throws Exception {
+      deleteDirectory(new File("./target/server"));
+      ActiveMQServer server = createServer("./target/server");
+
+      try {
+         server.start();
+
+         ConnectionFactory factory = new ActiveMQConnectionFactory();
+         Connection connection = factory.createConnection();
+
+         Session session = connection.createSession();
+
+         MessageProducer producer = session.createProducer(session.createQueue("queue"));
+
+         try {
+            for (int i = 0; i < 500; i++) {
+               producer.send(session.createTextMessage("text"));
+            }
+         } catch (JMSException expected) {
+         }
+
+         Wait.waitFor(() -> !server.isStarted());
+
+         Assert.assertFalse(server.isStarted());
+
+         System.out.println("Sent messages");
+
+      } finally {
+         server.stop();
+
+      }
+
+   }
+
+   ActiveMQServer createServer(String folder) throws Exception {
+      final AtomicBoolean blocked = new AtomicBoolean(false);
+      Configuration conf = createConfig(folder);
+      ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(),
new SecurityConfiguration());
+
+      conf.setPersistenceEnabled(true);
+
+      ActiveMQServer server = new ActiveMQServerImpl(conf, securityManager) {
+
+         @Override
+         protected StorageManager createStorageManager() {
+
+            JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(),
executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
+
+               @Override
+               protected Journal createMessageJournal(Configuration config,
+                                                      IOCriticalErrorListener criticalErrorListener,
+                                                      int fileSize) {
+                  return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(),
config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(),
config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(),
0, criticalErrorListener) {
+                     @Override
+                     protected void moveNextFile(boolean scheduleReclaim) throws Exception
{
+                        super.moveNextFile(scheduleReclaim);
+                        if (blocked.get()) {
+                           throw new IllegalStateException("forcibly down");
+                        }
+                     }
+                  };
+               }
+
+               @Override
+               public void storeMessage(Message message) throws Exception {
+                  super.storeMessage(message);
+                  blocked.set(true);
+               }
+            };
+
+            this.getCriticalAnalyzer().add(storageManager);
+
+            return storageManager;
+         }
+
+      };
+
+      return server;
+   }
+
+   Configuration createConfig(String folder) throws Exception {
+
+      Configuration configuration = createDefaultConfig(true);
+      configuration.setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100
* 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(folder + "/journal").setBindingsDirectory(folder
+ "/bindings").setPagingDirectory(folder + "/paging").
+         setLargeMessagesDirectory(folder + "/largemessage").setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
+      configuration.setSecurityEnabled(false);
+      configuration.setPersistenceEnabled(true);
+
+      return configuration;
+   }
+
+}


Mime
View raw message