activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [18/34] activemq-artemis git commit: ARTEMIS-822 Add executor service to JournalImpl for append operations and remove synchronization
Date Tue, 01 Nov 2016 10:21:47 GMT
ARTEMIS-822 Add executor service to JournalImpl for append operations and remove synchronization

https://issues.apache.org/jira/browse/ARTEMIS-822


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b47461f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b47461f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b47461f

Branch: refs/heads/ARTEMIS-780
Commit: 4b47461f03a607b9ef517beb2a1666ffae43a2a7
Parents: bfb9bed
Author: barreiro <lbbbarreiro@gmail.com>
Authored: Fri Jan 22 03:23:26 2016 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Fri Oct 28 16:54:59 2016 -0400

----------------------------------------------------------------------
 .../cli/commands/tools/DecodeJournal.java       |  20 +-
 .../activemq/artemis/utils/ExecutorFactory.java |  24 +
 .../artemis/utils/OrderedExecutorFactory.java   | 127 ++++
 .../activemq/artemis/utils/SimpleFuture.java    |  79 +++
 .../artemis/utils/SimpleFutureTest.java         |  69 ++
 .../activemq/artemis/utils/ExecutorFactory.java |  24 -
 .../artemis/utils/OrderedExecutorFactory.java   | 128 ----
 .../artemis/core/journal/impl/JournalImpl.java  | 662 +++++++++++--------
 .../core/journal/impl/JournalTransaction.java   |  46 +-
 .../artemis/journal/ActiveMQJournalLogger.java  |  12 +-
 .../journal/impl/AlignedJournalImplTest.java    |  39 +-
 .../core/journal/impl/JournalAsyncTest.java     |  15 +-
 12 files changed, 761 insertions(+), 484 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
index b392f6f..f290eba 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DecodeJournal.java
@@ -33,7 +33,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.core.journal.impl.JournalRecord;
 import org.apache.activemq.artemis.utils.Base64;
 
 @Command(name = "decode", description = "Decode a journal's internal format into a new journal set of files")
@@ -125,8 +124,6 @@ public class DecodeJournal extends LockAbstract {
 
       long lineNumber = 0;
 
-      Map<Long, JournalRecord> journalRecords = journal.getRecords();
-
       while ((line = buffReader.readLine()) != null) {
          lineNumber++;
          String[] splitLine = line.split(",");
@@ -150,12 +147,6 @@ public class DecodeJournal extends LockAbstract {
                counter.incrementAndGet();
                RecordInfo info = parseRecord(lineProperties);
                journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
-            } else if (operation.equals("AddRecordTX")) {
-               long txID = parseLong("txID", lineProperties);
-               AtomicInteger counter = getCounter(txID, txCounters);
-               counter.incrementAndGet();
-               RecordInfo info = parseRecord(lineProperties);
-               journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
             } else if (operation.equals("UpdateTX")) {
                long txID = parseLong("txID", lineProperties);
                AtomicInteger counter = getCounter(txID, txCounters);
@@ -168,20 +159,17 @@ public class DecodeJournal extends LockAbstract {
             } else if (operation.equals("DeleteRecord")) {
                long id = parseLong("id", lineProperties);
 
-               // If not found it means the append/update records were reclaimed already
-               if (journalRecords.get(id) != null) {
+               try {
                   journal.appendDeleteRecord(id, false);
+               } catch (IllegalStateException ignored) {
+                  // If not found it means the append/update records were reclaimed already
                }
             } else if (operation.equals("DeleteRecordTX")) {
                long txID = parseLong("txID", lineProperties);
                long id = parseLong("id", lineProperties);
                AtomicInteger counter = getCounter(txID, txCounters);
                counter.incrementAndGet();
-
-               // If not found it means the append/update records were reclaimed already
-               if (journalRecords.get(id) != null) {
-                  journal.appendDeleteRecordTransactional(txID, id);
-               }
+               journal.appendDeleteRecordTransactional(txID, id);
             } else if (operation.equals("Prepare")) {
                long txID = parseLong("txID", lineProperties);
                int numberOfRecords = parseInt("numberOfRecords", lineProperties);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
new file mode 100644
index 0000000..dd0209b
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.Executor;
+
+public interface ExecutorFactory {
+
+   Executor getExecutor();
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
new file mode 100644
index 0000000..c7d5c03
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.jboss.logging.Logger;
+
+/**
+ * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
+ */
+public final class OrderedExecutorFactory implements ExecutorFactory {
+
+   private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
+
+   private final Executor parent;
+
+   /**
+    * Construct a new instance delegating to the given parent executor.
+    *
+    * @param parent the parent executor
+    */
+   public OrderedExecutorFactory(final Executor parent) {
+      this.parent = parent;
+   }
+
+   /**
+    * Get an executor that always executes tasks in order.
+    *
+    * @return an ordered executor
+    */
+   @Override
+   public Executor getExecutor() {
+      return new OrderedExecutor(parent);
+   }
+
+   /**
+    * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
+    * <br>
+    * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
+    * same method, will result in B's task running after A's.
+    */
+   private static class OrderedExecutor implements Executor {
+
+      private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
+      private final Executor delegate;
+      private final ExecutorTask task = new ExecutorTask();
+
+      // used by stateUpdater
+      @SuppressWarnings("unused")
+      private volatile int state = 0;
+
+      private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
+
+      private static final int STATE_NOT_RUNNING = 0;
+      private static final int STATE_RUNNING = 1;
+
+      private OrderedExecutor(Executor delegate) {
+         this.delegate = delegate;
+      }
+
+      @Override
+      public void execute(Runnable command) {
+         tasks.add(command);
+         if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+            //note that this can result in multiple tasks being queued
+            //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
+            delegate.execute(task);
+         }
+      }
+
+      private final class ExecutorTask implements Runnable {
+
+         @Override
+         public void run() {
+            do {
+               //if there is no thread active then we run
+               if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
+                  Runnable task = tasks.poll();
+                  //while the queue is not empty we process in order
+                  while (task != null) {
+                     try {
+                        task.run();
+                     } catch (ActiveMQInterruptedException e) {
+                        // This could happen during shutdowns. Nothing to be concerned about here
+                        logger.debug("Interrupted Thread", e);
+                     } catch (Throwable t) {
+                        logger.warn(t.getMessage(), t);
+                     }
+                     task = tasks.poll();
+                  }
+                  //set state back to not running.
+                  stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
+               } else {
+                  return;
+               }
+               //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
+               //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
+               //this check fixes the issue
+            } while (!tasks.isEmpty());
+         }
+      }
+
+      @Override
+      public String toString() {
+         return "OrderedExecutor(tasks=" + tasks + ")";
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
new file mode 100644
index 0000000..eedfef4
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SimpleFuture.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class SimpleFuture<V> implements Future<V> {
+
+   public SimpleFuture() {
+   }
+
+   V value;
+   Exception exception;
+
+   private final CountDownLatch latch = new CountDownLatch(1);
+
+   boolean canceled = false;
+
+   @Override
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      canceled = true;
+      latch.countDown();
+      return true;
+   }
+
+   @Override
+   public boolean isCancelled() {
+      return canceled;
+   }
+
+   @Override
+   public boolean isDone() {
+      return latch.getCount() <= 0;
+   }
+
+   public void fail(Exception e) {
+      this.exception = e;
+      latch.countDown();
+   }
+
+   @Override
+   public V get() throws InterruptedException, ExecutionException {
+      latch.await();
+      if (this.exception != null) {
+         throw new ExecutionException(this.exception);
+      }
+      return value;
+   }
+
+   public void set(V v) {
+      this.value = v;
+      latch.countDown();
+   }
+
+   @Override
+   public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      latch.await(timeout, unit);
+      return value;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
new file mode 100644
index 0000000..00fd5d7
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/SimpleFutureTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class SimpleFutureTest {
+
+   @Rule
+   public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
+
+   @Test
+   public void testFuture() throws Exception {
+      final long randomStart = System.currentTimeMillis();
+      final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+      Thread t = new Thread() {
+         @Override
+         public void run() {
+            simpleFuture.set(randomStart);
+         }
+      };
+      t.start();
+
+      Assert.assertEquals(randomStart, simpleFuture.get().longValue());
+   }
+
+
+   @Test
+   public void testException() throws Exception {
+      final SimpleFuture<Long> simpleFuture = new SimpleFuture<>();
+      Thread t = new Thread() {
+         @Override
+         public void run() {
+            simpleFuture.fail(new Exception("hello"));
+         }
+      };
+      t.start();
+
+      boolean failed = false;
+      try {
+         simpleFuture.get();
+      } catch (Exception e) {
+         failed = true;
+      }
+
+
+      Assert.assertTrue(failed);
+   }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
deleted file mode 100644
index dd0209b..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ExecutorFactory.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.concurrent.Executor;
-
-public interface ExecutorFactory {
-
-   Executor getExecutor();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
deleted file mode 100644
index 609af8e..0000000
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.utils;
-
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.jboss.logging.Logger;
-
-/**
- * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
- */
-public final class OrderedExecutorFactory implements ExecutorFactory {
-
-   private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
-
-   private final Executor parent;
-
-   /**
-    * Construct a new instance delegating to the given parent executor.
-    *
-    * @param parent the parent executor
-    */
-   public OrderedExecutorFactory(final Executor parent) {
-      this.parent = parent;
-   }
-
-   /**
-    * Get an executor that always executes tasks in order.
-    *
-    * @return an ordered executor
-    */
-   @Override
-   public Executor getExecutor() {
-      return new OrderedExecutor(parent);
-   }
-
-   /**
-    * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
-    * <br>
-    * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
-    * same method, will result in B's task running after A's.
-    */
-   private static class OrderedExecutor implements Executor {
-
-      private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
-      private final Executor delegate;
-      private final ExecutorTask task = new ExecutorTask();
-
-      // used by stateUpdater
-      @SuppressWarnings("unused")
-      private volatile int state = 0;
-
-      private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state");
-
-      private static final int STATE_NOT_RUNNING = 0;
-      private static final int STATE_RUNNING = 1;
-
-      private OrderedExecutor(Executor delegate) {
-         this.delegate = delegate;
-      }
-
-      @Override
-      public void execute(Runnable command) {
-         tasks.add(command);
-         if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
-            //note that this can result in multiple tasks being queued
-            //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored
-            delegate.execute(task);
-         }
-      }
-
-      private final class ExecutorTask implements Runnable {
-
-         @Override
-         public void run() {
-            do {
-               //if there is no thread active then we run
-               if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) {
-                  Runnable task = tasks.poll();
-                  //while the queue is not empty we process in order
-                  while (task != null) {
-                     try {
-                        task.run();
-                     } catch (ActiveMQInterruptedException e) {
-                        // This could happen during shutdowns. Nothing to be concerned about here
-                        logger.debug("Interrupted Thread", e);
-                     } catch (Throwable t) {
-                        ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
-                     }
-                     task = tasks.poll();
-                  }
-                  //set state back to not running.
-                  stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
-               } else {
-                  return;
-               }
-               //we loop again based on tasks not being empty. Otherwise there is a window where the state is running,
-               //but poll() has returned null, so a submitting thread will believe that it does not need re-execute.
-               //this check fixes the issue
-            } while (!tasks.isEmpty());
-         }
-      }
-
-      @Override
-      public String toString() {
-         return "OrderedExecutor(tasks=" + tasks + ")";
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index b6d5e62..43db1f7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -29,11 +29,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,6 +47,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@@ -160,6 +163,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<>();
 
+   private final Set<Long> pendingRecords = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
+
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalTransaction> transactions = new ConcurrentHashMap<>();
 
@@ -172,12 +177,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
    private ExecutorService compactorExecutor = null;
 
-   private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
+   private ExecutorService appendExecutor = null;
 
-   // Lock used during the append of records
-   // This lock doesn't represent a global lock.
-   // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
-   private final Object lockAppend = new Object();
+   private ConcurrentHashSet<CountDownLatch> latches = new ConcurrentHashSet<>();
 
    /**
     * We don't lock the journal during the whole compacting operation. During compacting we only
@@ -688,32 +690,37 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                final boolean sync,
                                final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
+      lineUpContext(callback);
+      pendingRecords.add(id);
 
-      journalLock.readLock().lock();
-
-      try {
-         JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
-
-         if (callback != null) {
-            callback.storeLineUp();
-         }
-
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
+               JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
+               records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendAddRecord::id=" + id +
-                               ", userRecordType=" +
-                               recordType +
-                               ", record = " + record +
-                               ", usedFile = " +
-                               usedFile);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendAddRecord::id=" + id +
+                                             ", userRecordType=" +
+                                             recordType +
+                                             ", record = " + record +
+                                             ", usedFile = " +
+                                             usedFile);
+               }
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+            } finally {
+               pendingRecords.remove(id);
+               journalLock.readLock().unlock();
             }
-
-            records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
          }
-      } finally {
-         journalLock.readLock().unlock();
+      });
+
+      if (sync && callback == null) {
+         result.get();
       }
    }
 
@@ -724,94 +731,86 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                   final boolean sync,
                                   final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
+      lineUpContext(callback);
+      checkKnownRecordID(id);
 
-      journalLock.readLock().lock();
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalRecord jrnRecord = records.get(id);
+               JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+               JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
 
-      try {
-         JournalRecord jrnRecord = records.get(id);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendUpdateRecord::id=" + id +
+                                  ", userRecordType=" +
+                                  recordType +
+                                  ", usedFile = " +
+                                  usedFile);
+               }
 
-         if (jrnRecord == null) {
-            if (!(compactor != null && compactor.lookupRecord(id))) {
-               throw new IllegalStateException("Cannot find add info " + id);
+               // record==null here could only mean there is a compactor
+               // computing the delete should be done after compacting is done
+               if (jrnRecord == null) {
+                  compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
+               } else {
+                  jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
+               }
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+            } finally {
+               journalLock.readLock().unlock();
             }
          }
+      });
 
-         JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
-
-         if (callback != null) {
-            callback.storeLineUp();
-         }
-
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
-
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendUpdateRecord::id=" + id +
-                               ", userRecordType=" +
-                               recordType +
-                               ", record = " + record +
-                               ", usedFile = " +
-                               usedFile);
-            }
-
-            // record== null here could only mean there is a compactor, and computing the delete should be done after
-            // compacting is done
-            if (jrnRecord == null) {
-               compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
-            } else {
-               jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
-            }
-         }
-      } finally {
-         journalLock.readLock().unlock();
+      if (sync && callback == null) {
+         result.get();
       }
    }
 
    @Override
    public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
+      lineUpContext(callback);
+      checkKnownRecordID(id);
 
-      journalLock.readLock().lock();
-      try {
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalRecord record = null;
+               if (compactor == null) {
+                  record = records.remove(id);
+               }
 
-         JournalRecord record = null;
+               JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+               JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
 
-         if (compactor == null) {
-            record = records.remove(id);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
+               }
 
-            if (record == null) {
-               throw new IllegalStateException("Cannot find add info " + id);
-            }
-         } else {
-            if (!records.containsKey(id) && !compactor.lookupRecord(id)) {
-               throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
+               // record==null here could only mean there is a compactor
+               // computing the delete should be done after compacting is done
+               if (record == null) {
+                  compactor.addCommandDelete(id, usedFile);
+               } else {
+                  record.delete(usedFile);
+               }
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+            } finally {
+               journalLock.readLock().unlock();
             }
          }
+      });
 
-         JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
-
-         if (callback != null) {
-            callback.storeLineUp();
-         }
-
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
-
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendDeleteRecord::id=" + id + ", usedFile = " + usedFile);
-            }
-
-            // record== null here could only mean there is a compactor, and computing the delete should be done after
-            // compacting is done
-            if (record == null) {
-               compactor.addCommandDelete(id, usedFile);
-            } else {
-               record.delete(usedFile);
-            }
-
-         }
-      } finally {
-         journalLock.readLock().unlock();
+      if (sync && callback == null) {
+         result.get();
       }
    }
 
@@ -822,31 +821,62 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                             final EncodingSupport record) throws Exception {
       checkJournalIsLoaded();
 
-      journalLock.readLock().lock();
+      final JournalTransaction tx = getTransactionInfo(txID);
+      tx.checkErrorCondition();
 
-      try {
-         JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+      appendExecutor.submit(new Runnable() {
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
+               JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendAddRecordTransactional:txID=" + txID +
+                                  ",id=" +
+                                  id +
+                                  ", userRecordType=" +
+                                  recordType +
+                                  ", record = " + record +
+                                  ", usedFile = " +
+                                  usedFile);
+               }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendAddRecordTransactional:txID=" + txID +
-                               ",id=" +
-                               id +
-                               ", userRecordType=" +
-                               recordType +
-                               ", record = " + record +
-                               ", usedFile = " +
-                               usedFile);
+               tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               setErrorCondition(tx, e);
+            } finally {
+               journalLock.readLock().unlock();
             }
+         }
+      });
+   }
+
+   private void checkKnownRecordID(final long id) throws Exception {
+      if (records.containsKey(id) || pendingRecords.contains(id) || (compactor != null && compactor.lookupRecord(id))) {
+         return;
+      }
 
-            tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+      // retry on the append thread. maybe the appender thread is not keeping up.
+      Future<Boolean> known = appendExecutor.submit(new Callable<Boolean>() {
+         @Override
+         public Boolean call() throws Exception {
+            journalLock.readLock().lock();
+            try {
+               return records.containsKey(id)
+                  || pendingRecords.contains(id)
+                  || (compactor != null && compactor.lookupRecord(id));
+            } finally {
+               journalLock.readLock().unlock();
+            }
          }
-      } finally {
-         journalLock.readLock().unlock();
+      });
+
+      if (!known.get()) {
+         throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
       }
    }
 
@@ -867,32 +897,39 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                                final EncodingSupport record) throws Exception {
       checkJournalIsLoaded();
 
-      journalLock.readLock().lock();
+      final JournalTransaction tx = getTransactionInfo(txID);
+      tx.checkErrorCondition();
 
-      try {
-         JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
+      appendExecutor.submit(new Runnable() {
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
+               JournalInternalRecord updateRecordTX = new JournalAddRecordTX( false, txID, id, recordType, record );
+               JournalFile usedFile = appendRecord( updateRecordTX, false, false, tx, null );
+
+               if ( logger.isTraceEnabled() ) {
+                  logger.trace( "appendUpdateRecordTransactional::txID=" + txID +
+                          ",id=" +
+                          id +
+                          ", userRecordType=" +
+                          recordType +
+                          ", record = " + record +
+                          ", usedFile = " +
+                          usedFile );
+               }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendUpdateRecordTransactional::txID=" + txID +
-                               ",id=" +
-                               id +
-                               ", userRecordType=" +
-                               recordType +
-                               ", record = " + record +
-                               ", usedFile = " +
-                               usedFile);
+               tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize() );
+            } catch ( Exception e ) {
+               ActiveMQJournalLogger.LOGGER.error( e.getMessage(), e );
+               setErrorCondition( tx, e );
+            } finally {
+               journalLock.readLock().unlock();
             }
-
-            tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
          }
-      } finally {
-         journalLock.readLock().unlock();
-      }
+      });
    }
 
    @Override
@@ -901,29 +938,35 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                                final EncodingSupport record) throws Exception {
       checkJournalIsLoaded();
 
-      journalLock.readLock().lock();
+      final JournalTransaction tx = getTransactionInfo(txID);
+      tx.checkErrorCondition();
 
-      try {
-         JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+      appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
 
-         JournalTransaction tx = getTransactionInfo(txID);
+               JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+               JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendDeleteRecordTransactional::txID=" + txID +
+                                  ", id=" +
+                                  id +
+                                  ", usedFile = " +
+                                  usedFile);
+               }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendDeleteRecordTransactional::txID=" + txID +
-                               ", id=" +
-                               id +
-                               ", usedFile = " +
-                               usedFile);
+               tx.addNegative(usedFile, id);
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               setErrorCondition(tx, e);
+            } finally {
+               journalLock.readLock().unlock();
             }
-
-            tx.addNegative(usedFile, id);
          }
-      } finally {
-         journalLock.readLock().unlock();
-      }
+      });
    }
 
    /**
@@ -943,36 +986,53 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                    final IOCompletion callback) throws Exception {
 
       checkJournalIsLoaded();
+      lineUpContext(callback);
 
-      journalLock.readLock().lock();
-
-      try {
-         JournalTransaction tx = getTransactionInfo(txID);
-
-         JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+      final JournalTransaction tx = getTransactionInfo(txID);
+      tx.checkErrorCondition();
 
-         if (callback != null) {
-            callback.storeLineUp();
-         }
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.PREPARE, txID, transactionData);
+               JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+               }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendPrepareRecord::txID=" + txID + ", usedFile = " + usedFile);
+               tx.prepare(usedFile);
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               setErrorCondition(tx, e);
+            } finally {
+               journalLock.readLock().unlock();
             }
-
-            tx.prepare(usedFile);
          }
+      });
 
-      } finally {
-         journalLock.readLock().unlock();
+      if (sync && callback == null) {
+         result.get();
+         tx.checkErrorCondition();
       }
    }
 
    @Override
    public void lineUpContext(IOCompletion callback) {
-      callback.storeLineUp();
+      if (callback != null) {
+         callback.storeLineUp();
+      }
+   }
+
+   private void setErrorCondition(JournalTransaction jt, Throwable t) {
+      if (jt != null) {
+         TransactionCallback callback = jt.getCurrentCallback();
+         if (callback != null && callback.getErrorMessage() != null) {
+            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), t.getMessage());
+         }
+      }
    }
 
    /**
@@ -982,68 +1042,83 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void appendCommitRecord(final long txID,
                                   final boolean sync,
                                   final IOCompletion callback,
-                                  boolean lineUpContext) throws Exception {
+                                  final boolean lineUpContext) throws Exception {
       checkJournalIsLoaded();
+      if (lineUpContext) {
+         lineUpContext(callback);
+      }
 
-      journalLock.readLock().lock();
+      final JournalTransaction tx = transactions.remove(txID);
 
-      try {
-         JournalTransaction tx = transactions.remove(txID);
+      if (tx == null) {
+         throw new IllegalStateException("Cannot find tx with id " + txID);
+      }
 
-         if (tx == null) {
-            throw new IllegalStateException("Cannot find tx with id " + txID);
-         }
+      tx.checkErrorCondition();
 
-         JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalInternalRecord commitRecord = new JournalCompleteRecordTX(TX_RECORD_TYPE.COMMIT, txID, null);
+               JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
 
-         if (callback != null && lineUpContext) {
-            callback.storeLineUp();
-         }
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+               }
 
-            if (logger.isTraceEnabled()) {
-               logger.trace("appendCommitRecord::txID=" + txID + ", usedFile = " + usedFile);
+               tx.commit(usedFile);
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               setErrorCondition(tx, e);
+            } finally {
+               journalLock.readLock().unlock();
             }
-
-            tx.commit(usedFile);
          }
+      });
 
-      } finally {
-         journalLock.readLock().unlock();
+      if (sync && callback == null) {
+         result.get();
+         tx.checkErrorCondition();
       }
    }
 
    @Override
    public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception {
       checkJournalIsLoaded();
+      lineUpContext(callback);
 
-      journalLock.readLock().lock();
-
-      JournalTransaction tx = null;
-
-      try {
-         tx = transactions.remove(txID);
-
-         if (tx == null) {
-            throw new IllegalStateException("Cannot find tx with id " + txID);
-         }
+      final JournalTransaction tx = transactions.remove(txID);
 
-         JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
+      if (tx == null) {
+         throw new IllegalStateException("Cannot find tx with id " + txID);
+      }
 
-         if (callback != null) {
-            callback.storeLineUp();
-         }
+      tx.checkErrorCondition();
 
-         synchronized (lockAppend) {
-            JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
+      Future<?> result = appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+               JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
+               JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
 
-            tx.rollback(usedFile);
+               tx.rollback(usedFile);
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.error(e.getMessage(), e);
+               setErrorCondition(tx, e);
+            }  finally {
+               journalLock.readLock().unlock();
+            }
          }
+      });
 
-      } finally {
-         journalLock.readLock().unlock();
+      if (sync && callback == null) {
+         result.get();
+         tx.checkErrorCondition();
       }
    }
 
@@ -1906,13 +1981,23 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    public void debugWait() throws InterruptedException {
       fileFactory.flush();
 
-      for (JournalTransaction tx : transactions.values()) {
-         tx.waitCallbacks();
+      if (appendExecutor != null && !appendExecutor.isShutdown()) {
+         // Send something to the closingExecutor, just to make sure we went until its end
+         final CountDownLatch latch = newLatch(1);
+
+         appendExecutor.execute(new Runnable() {
+
+            @Override
+            public void run() {
+               latch.countDown();
+            }
+
+         });
+         awaitLatch(latch, -1);
       }
 
       if (filesExecutor != null && !filesExecutor.isShutdown()) {
-         // Send something to the closingExecutor, just to make sure we went
-         // until its end
+         // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = newLatch(1);
 
          filesExecutor.execute(new Runnable() {
@@ -1985,20 +2070,52 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
    // In some tests we need to force the journal to move to a next file
    @Override
    public void forceMoveNextFile() throws Exception {
-      journalLock.readLock().lock();
+      debugWait();
+      journalLock.writeLock().lock();
       try {
-         synchronized (lockAppend) {
-            moveNextFile(false);
-            debugWait();
-         }
+         moveNextFile(false);
       } finally {
-         journalLock.readLock().unlock();
+         journalLock.writeLock().unlock();
       }
    }
 
    @Override
    public void perfBlast(final int pages) {
-      new PerfBlast(pages).start();
+
+      checkJournalIsLoaded();
+
+      final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
+
+      final JournalInternalRecord blastRecord = new JournalInternalRecord() {
+
+         @Override
+         public int getEncodeSize() {
+            return byteEncoder.getEncodeSize();
+         }
+
+         @Override
+         public void encode(final ActiveMQBuffer buffer) {
+            byteEncoder.encode(buffer);
+         }
+      };
+
+      appendExecutor.submit(new Runnable() {
+         @Override
+         public void run() {
+            journalLock.readLock().lock();
+            try {
+
+               for (int i = 0; i < pages; i++) {
+                  appendRecord(blastRecord, false, false, null, null);
+               }
+
+            } catch (Exception e) {
+               ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
+            } finally {
+               journalLock.readLock().unlock();
+            }
+         }
+      });
    }
 
    // ActiveMQComponent implementation
@@ -2031,6 +2148,14 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       });
 
+      appendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+         @Override
+         public Thread newThread(final Runnable r) {
+            return new Thread(r, "JournalImpl::appendExecutor");
+         }
+      });
+
       filesRepository.setExecutor(filesExecutor);
 
       fileFactory.start();
@@ -2044,46 +2169,50 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      journalLock.writeLock().lock();
-      try {
-         synchronized (lockAppend) {
+      setJournalState(JournalState.STOPPED);
 
-            setJournalState(JournalState.STOPPED);
+      // appendExecutor must be shut down first
+      appendExecutor.shutdown();
 
-            compactorExecutor.shutdown();
+      if (!appendExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+         ActiveMQJournalLogger.LOGGER.couldNotStopJournalAppendExecutor();
+      }
 
-            if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
-               ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
-            }
+      journalLock.writeLock().lock();
+      try {
+         compactorExecutor.shutdown();
 
-            filesExecutor.shutdown();
+         if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
+            ActiveMQJournalLogger.LOGGER.couldNotStopCompactor();
+         }
 
-            filesRepository.setExecutor(null);
+         filesExecutor.shutdown();
 
-            if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
-               ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
-            }
+         filesRepository.setExecutor(null);
 
-            try {
-               for (CountDownLatch latch : latches) {
-                  latch.countDown();
-               }
-            } catch (Throwable e) {
-               ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+         if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+            ActiveMQJournalLogger.LOGGER.couldNotStopJournalExecutor();
+         }
+
+         try {
+            for (CountDownLatch latch : latches) {
+               latch.countDown();
             }
+         } catch (Throwable e) {
+            ActiveMQJournalLogger.LOGGER.warn(e.getMessage(), e);
+         }
 
-            fileFactory.deactivateBuffer();
+         fileFactory.deactivateBuffer();
 
-            if (currentFile != null && currentFile.getFile().isOpen()) {
-               currentFile.getFile().close();
-            }
+         if (currentFile != null && currentFile.getFile().isOpen()) {
+            currentFile.getFile().close();
+         }
 
-            filesRepository.clear();
+         filesRepository.clear();
 
-            fileFactory.stop();
+         fileFactory.stop();
 
-            currentFile = null;
-         }
+         currentFile = null;
       } finally {
          journalLock.writeLock().unlock();
       }
@@ -2358,7 +2487,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                     final boolean sync,
                                     final JournalTransaction tx,
                                     final IOCallback parameterCallback) throws Exception {
-      checkJournalIsLoaded();
 
       final IOCallback callback;
 
@@ -2552,46 +2680,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       }
    }
 
-   private final class PerfBlast extends Thread {
-
-      private final int pages;
-
-      private PerfBlast(final int pages) {
-         super("activemq-perfblast-thread");
-
-         this.pages = pages;
-      }
-
-      @Override
-      public void run() {
-         synchronized (lockAppend) {
-            try {
-
-               final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
-               JournalInternalRecord blastRecord = new JournalInternalRecord() {
-
-                  @Override
-                  public int getEncodeSize() {
-                     return byteEncoder.getEncodeSize();
-                  }
-
-                  @Override
-                  public void encode(final ActiveMQBuffer buffer) {
-                     byteEncoder.encode(buffer);
-                  }
-               };
-
-               for (int i = 0; i < pages; i++) {
-                  appendRecord(blastRecord, false, false, null, null);
-               }
-            } catch (Exception e) {
-               ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
-            }
-         }
-      }
-   }
-
    @Override
    public final void synchronizationLock() {
       compactorLock.writeLock().lock();
@@ -2624,7 +2712,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          long maxID = -1;
          for (long id : fileIds) {
             maxID = Math.max(maxID, id);
-            map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
+            map.put(id, filesRepository.createRemoteBackupSyncFile(id));
          }
          filesRepository.setNextFileID(maxID);
          return map;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
index 6e41c17..1542bd4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java
@@ -17,11 +17,13 @@
 package org.apache.activemq.artemis.core.journal.impl;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -45,12 +47,14 @@ public class JournalTransaction {
 
    private boolean compacting = false;
 
-   private Map<JournalFile, TransactionCallback> callbackList;
+   private final Map<JournalFile, TransactionCallback> callbackList = Collections.synchronizedMap(new HashMap<JournalFile, TransactionCallback>());
 
    private JournalFile lastFile = null;
 
    private final AtomicInteger counter = new AtomicInteger();
 
+   private CountDownLatch firstCallbackLatch;
+
    public JournalTransaction(final long id, final JournalRecordProvider journal) {
       this.id = id;
       this.journal = journal;
@@ -139,9 +143,7 @@ public class JournalTransaction {
          pendingFiles.clear();
       }
 
-      if (callbackList != null) {
-         callbackList.clear();
-      }
+      callbackList.clear();
 
       if (pos != null) {
          pos.clear();
@@ -156,6 +158,8 @@ public class JournalTransaction {
       lastFile = null;
 
       currentCallback = null;
+
+      firstCallbackLatch = null;
    }
 
    /**
@@ -166,9 +170,13 @@ public class JournalTransaction {
       data.setNumberOfRecords(getCounter(currentFile));
    }
 
+   public TransactionCallback getCurrentCallback() {
+      return currentCallback;
+   }
+
    public TransactionCallback getCallback(final JournalFile file) throws Exception {
-      if (callbackList == null) {
-         callbackList = new HashMap<>();
+      if (firstCallbackLatch != null && callbackList.isEmpty()) {
+         firstCallbackLatch.countDown();
       }
 
       currentCallback = callbackList.get(file);
@@ -178,15 +186,19 @@ public class JournalTransaction {
          callbackList.put(file, currentCallback);
       }
 
-      if (currentCallback.getErrorMessage() != null) {
-         throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
-      }
-
       currentCallback.countUp();
 
       return currentCallback;
    }
 
+   public void checkErrorCondition() throws Exception {
+      if (currentCallback != null) {
+         if (currentCallback.getErrorMessage() != null) {
+            throw ActiveMQExceptionType.createException(currentCallback.getErrorCode(), currentCallback.getErrorMessage());
+         }
+      }
+   }
+
    public void addPositive(final JournalFile file, final long id, final int size) {
       incCounter(file);
 
@@ -264,7 +276,8 @@ public class JournalTransaction {
    }
 
    public void waitCallbacks() throws InterruptedException {
-      if (callbackList != null) {
+      waitFirstCallback();
+      synchronized (callbackList) {
          for (TransactionCallback callback : callbackList.values()) {
             callback.waitCompletion();
          }
@@ -275,8 +288,15 @@ public class JournalTransaction {
     * Wait completion at the latest file only
     */
    public void waitCompletion() throws Exception {
-      if (currentCallback != null) {
-         currentCallback.waitCompletion();
+      waitFirstCallback();
+      currentCallback.waitCompletion();
+   }
+
+   private void waitFirstCallback() throws InterruptedException {
+      if (currentCallback == null) {
+         firstCallbackLatch = new CountDownLatch(1);
+         firstCallbackLatch.await();
+         firstCallbackLatch = null;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
index 198185c..6758c64 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/journal/ActiveMQJournalLogger.java
@@ -143,7 +143,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
    void compactReadError(JournalFile file);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 142012, value = "Couldn''t find tx={0} to merge after compacting",
+   @Message(id = 142012, value = "Couldn't find tx={0} to merge after compacting",
       format = Message.Format.MESSAGE_FORMAT)
    void compactMergeError(Long id);
 
@@ -163,12 +163,12 @@ public interface ActiveMQJournalLogger extends BasicLogger {
    void uncomittedTxFound(Long id);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 142016, value = "Couldn''t stop compactor executor after 120 seconds",
+   @Message(id = 142016, value = "Could not stop compactor executor after 120 seconds",
       format = Message.Format.MESSAGE_FORMAT)
    void couldNotStopCompactor();
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 142017, value = "Couldn''t stop journal executor after 60 seconds",
+   @Message(id = 142017, value = "Could not stop journal executor after 60 seconds",
       format = Message.Format.MESSAGE_FORMAT)
    void couldNotStopJournalExecutor();
 
@@ -182,7 +182,7 @@ public interface ActiveMQJournalLogger extends BasicLogger {
    void deletingOrphanedFile(String fileToDelete);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 142020, value = "Couldn''t get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 142020, value = "Could not get lock after 60 seconds on closing Asynchronous File: {0}", format = Message.Format.MESSAGE_FORMAT)
    void errorClosingFile(String fileToDelete);
 
    @LogMessage(level = Logger.Level.WARN)
@@ -241,6 +241,10 @@ public interface ActiveMQJournalLogger extends BasicLogger {
    @Message(id = 142034, value = "Exception on submitting write", format = Message.Format.MESSAGE_FORMAT)
    void errorSubmittingWrite(@Cause Throwable e);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 142035, value = "Could not stop journal append executor after 60 seconds", format = Message.Format.MESSAGE_FORMAT)
+   void couldNotStopJournalAppendExecutor();
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 144000, value = "Failed to delete file {0}", format = Message.Format.MESSAGE_FORMAT)
    void errorDeletingFile(Object e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
index 080db78..5e27b36 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/AlignedJournalImplTest.java
@@ -532,6 +532,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.appendCommitRecord(1L, false);
 
+      journalImpl.debugWait();
+
       System.out.println("Files = " + factory.listFiles("tt"));
 
       SequentialFile file = factory.createSequentialFile("tt-1.tt");
@@ -598,6 +600,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.appendCommitRecord(2L, false);
 
+      journalImpl.debugWait();
+
       SequentialFile file = factory.createSequentialFile("tt-1.tt");
 
       file.open();
@@ -697,6 +701,8 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.appendCommitRecord(1L, false);
 
+      journalImpl.debugWait();
+
       SequentialFile file = factory.createSequentialFile("tt-1.tt");
 
       file.open();
@@ -936,8 +942,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
 
       journalImpl.forceMoveNextFile();
 
-      // Reclaiming should still be able to reclaim a file if a transaction was
-      // ignored
+      // Reclaiming should still be able to reclaim a file if a transaction was ignored
       journalImpl.checkReclaimStatus();
 
       Assert.assertEquals(2, factory.listFiles("tt").size());
@@ -1109,7 +1114,16 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testReclaimingAfterConcurrentAddsAndDeletes() throws Exception {
+   public void testReclaimingAfterConcurrentAddsAndDeletesTx() throws Exception {
+      testReclaimingAfterConcurrentAddsAndDeletes(true);
+   }
+
+   @Test
+   public void testReclaimingAfterConcurrentAddsAndDeletesNonTx() throws Exception {
+      testReclaimingAfterConcurrentAddsAndDeletes(false);
+   }
+
+   public void testReclaimingAfterConcurrentAddsAndDeletes(final boolean transactional) throws Exception {
       final int JOURNAL_SIZE = 10 * 1024;
 
       setupAndLoadJournal(JOURNAL_SIZE, 1);
@@ -1131,8 +1145,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
                latchReady.countDown();
                ActiveMQTestBase.waitForLatch(latchStart);
                for (int i = 0; i < NUMBER_OF_ELEMENTS; i++) {
-                  journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
-                  journalImpl.appendCommitRecord(i, false);
+
+                  if (transactional) {
+                     journalImpl.appendAddRecordTransactional(i, i, (byte) 1, new SimpleEncoding(50, (byte) 1));
+                     journalImpl.appendCommitRecord(i, false);
+                  } else {
+                     journalImpl.appendAddRecord(i, (byte) 1, new SimpleEncoding(50, (byte) 1), false);
+                  }
+
                   queueDelete.offer(i);
                }
                finishedOK.incrementAndGet();
@@ -1153,7 +1173,14 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
                   if (toDelete == null) {
                      break;
                   }
-                  journalImpl.appendDeleteRecord(toDelete, false);
+
+                  if (transactional) {
+                     journalImpl.appendDeleteRecordTransactional(toDelete, toDelete, new SimpleEncoding(50, (byte) 1));
+                     journalImpl.appendCommitRecord(i, false);
+                  } else {
+                     journalImpl.appendDeleteRecord(toDelete, false);
+                  }
+
                }
                finishedOK.incrementAndGet();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b47461f/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
index 41058c6..204600e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
@@ -81,6 +81,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
                   journalImpl.appendAddRecordTransactional(1L, i, (byte) 1, new SimpleEncoding(1, (byte) 0));
                }
 
+               journalImpl.debugWait();
+
                latch.countDown();
                factory.setHoldCallbacks(false, null);
                if (isCommit) {
@@ -115,8 +117,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
       }
    }
 
-   // If a callback error already arrived, we should just throw the exception
-   // right away
+   // If a callback error already arrived, we should just throw the exception right away
    @Test
    public void testPreviousError() throws Exception {
       final int JOURNAL_SIZE = 20000;
@@ -128,6 +129,8 @@ public class JournalAsyncTest extends ActiveMQTestBase {
 
       journalImpl.appendAddRecordTransactional(1L, 1, (byte) 1, new SimpleEncoding(1, (byte) 0));
 
+      journalImpl.debugWait();
+
       factory.flushAllCallbacks();
 
       factory.setGenerateErrors(false);
@@ -135,11 +138,11 @@ public class JournalAsyncTest extends ActiveMQTestBase {
 
       try {
          journalImpl.appendAddRecordTransactional(1L, 2, (byte) 1, new SimpleEncoding(1, (byte) 0));
-         Assert.fail("Exception expected"); // An exception already happened in one
-         // of the elements on this transaction.
-         // We can't accept any more elements on
-         // the transaction
+         Assert.fail("Exception expected");
+         // An exception already happened in one of the elements on this transaction.
+         // We can't accept any more elements on the transaction
       } catch (Exception ignored) {
+
       }
    }
 


Mime
View raw message