activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/8] activemq-artemis git commit: ARTEMIS-27 / ARTEMIS-338 Refactor Journal Encodings into new package
Date Wed, 13 Jan 2016 14:49:41 GMT
ARTEMIS-27 / ARTEMIS-338  Refactor Journal Encodings into new package


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9b351d82
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9b351d82
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9b351d82

Branch: refs/heads/master
Commit: 9b351d82368723fc2d549d2ce8f952d9def1136f
Parents: aab09a7
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Wed Jan 6 13:50:35 2016 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed Jan 13 09:38:08 2016 -0500

----------------------------------------------------------------------
 .../artemis/cli/commands/tools/PrintData.java   |   9 +-
 .../cli/commands/tools/XmlDataExporter.java     |   8 +-
 .../core/journal/PreparedTransactionInfo.java   |  24 +-
 .../artemis/core/journal/impl/JournalImpl.java  |   4 +-
 .../impl/dataformat/JournalAddRecord.java       |   8 +-
 .../persistence/impl/journal/AckDescribe.java   |  34 +
 .../impl/journal/AddMessageRecord.java          |   7 +-
 .../impl/journal/DescribeJournal.java           |  57 +-
 .../impl/journal/DummyOperationContext.java     |  63 ++
 .../impl/journal/JournalRecordIds.java          |   4 +-
 .../impl/journal/JournalStorageManager.java     | 971 +------------------
 .../journal/LargeMessageTXFailureCallback.java  |  64 ++
 .../TXLargeMessageConfirmationOperation.java    |  46 +
 .../journal/codec/CursorAckRecordEncoding.java  |  61 ++
 .../impl/journal/codec/DeleteEncoding.java      |  59 ++
 .../codec/DeliveryCountUpdateEncoding.java      |  57 ++
 .../impl/journal/codec/DuplicateIDEncoding.java | 105 ++
 .../codec/FinishPageMessageOperation.java       |  55 ++
 .../impl/journal/codec/GroupingEncoding.java    |  75 ++
 .../codec/HeuristicCompletionEncoding.java      |  58 ++
 .../journal/codec/LargeMessageEncoding.java     |  52 +
 .../journal/codec/PageCountPendingImpl.java     |  79 ++
 .../impl/journal/codec/PageCountRecord.java     |  68 ++
 .../impl/journal/codec/PageCountRecordInc.java  |  64 ++
 .../journal/codec/PageUpdateTXEncoding.java     |  64 ++
 .../codec/PendingLargeMessageEncoding.java      |  60 ++
 .../codec/PersistentQueueBindingEncoding.java   | 142 +++
 .../impl/journal/codec/QueueEncoding.java       |  52 +
 .../impl/journal/codec/RefEncoding.java         |  28 +
 .../codec/ScheduledDeliveryEncoding.java        |  54 ++
 .../impl/journal/codec/XidEncoding.java         |  52 +
 .../journal/impl/AlignedJournalImplTest.java    |  16 +-
 .../core/journal/impl/JournalImplTestBase.java  |  14 +-
 33 files changed, 1521 insertions(+), 993 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
index c148484..cdc1bbf 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
@@ -47,7 +47,8 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -235,7 +236,7 @@ public class PrintData extends LockAbstract {
          ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
 
          if (record.userRecordType == JournalRecordIds.ACKNOWLEDGE_CURSOR) {
-            JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
             encoding.decode(buff);
 
             Set<PagePosition> set = cursorInfo.getCursorRecords().get(encoding.queueID);
@@ -248,7 +249,7 @@ public class PrintData extends LockAbstract {
             set.add(encoding.position);
          }
          else if (record.userRecordType == JournalRecordIds.PAGE_CURSOR_COMPLETE) {
-            JournalStorageManager.CursorAckRecordEncoding encoding = new JournalStorageManager.CursorAckRecordEncoding();
+            CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
             encoding.decode(buff);
 
             Long queueID = Long.valueOf(encoding.queueID);
@@ -260,7 +261,7 @@ public class PrintData extends LockAbstract {
          }
          else if (record.userRecordType == JournalRecordIds.PAGE_TRANSACTION) {
             if (record.isUpdate) {
-               JournalStorageManager.PageUpdateTXEncoding pageUpdate = new JournalStorageManager.PageUpdateTXEncoding();
+               PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
 
                pageUpdate.decode(buff);
                cursorInfo.getPgTXs().add(pageUpdate.pageTX);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
index 8994c72..2556db0 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java
@@ -71,10 +71,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
 import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
index 82abc4f..afad7d7 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/PreparedTransactionInfo.java
@@ -21,17 +21,33 @@ import java.util.List;
 
 public class PreparedTransactionInfo {
 
-   public final long id;
+   private final long id;
 
-   public final byte[] extraData;
+   private final byte[] extraData;
 
-   public final List<RecordInfo> records = new ArrayList<>();
+   private final List<RecordInfo> records = new ArrayList<RecordInfo>();
 
-   public final List<RecordInfo> recordsToDelete = new ArrayList<>();
+   private final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
 
    public PreparedTransactionInfo(final long id, final byte[] extraData) {
       this.id = id;
 
       this.extraData = extraData;
    }
+
+   public long getId() {
+      return id;
+   }
+
+   public byte[] getExtraData() {
+      return extraData;
+   }
+
+   public List<RecordInfo> getRecords() {
+      return records;
+   }
+
+   public List<RecordInfo> getRecordsToDelete() {
+      return recordsToDelete;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 14b6d92..ef6de60 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -1786,9 +1786,9 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
             PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
 
-            info.records.addAll(transaction.recordInfos);
+            info.getRecords().addAll(transaction.recordInfos);
 
-            info.recordsToDelete.addAll(transaction.recordsToDelete);
+            info.getRecordsToDelete().addAll(transaction.recordsToDelete);
 
             loadManager.addPreparedTransaction(info);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
index 69734bc..aa0e961 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/dataformat/JournalAddRecord.java
@@ -22,13 +22,13 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 
 public class JournalAddRecord extends JournalInternalRecord {
 
-   private final long id;
+   protected final long id;
 
-   private final EncodingSupport record;
+   protected final EncodingSupport record;
 
-   private final byte recordType;
+   protected final byte recordType;
 
-   private final boolean add;
+   protected final boolean add;
 
    /**
     * @param id

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
new file mode 100644
index 0000000..32e0f4d
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AckDescribe.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+
+public final class AckDescribe {
+
+   public RefEncoding refEncoding;
+
+   public AckDescribe(RefEncoding refEncoding) {
+      this.refEncoding = refEncoding;
+   }
+
+   @Override
+   public String toString() {
+      return "ACK;" + refEncoding;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
index 49ac289..fdae483 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AddMessageRecord.java
@@ -26,9 +26,11 @@ public final class AddMessageRecord {
 
    final ServerMessage message;
 
-   long scheduledDeliveryTime;
+   // mtaylor (Added to compile)
+   public long scheduledDeliveryTime;
 
-   int deliveryCount;
+   // mtaylor (Added to compile)
+   public int deliveryCount;
 
    public ServerMessage getMessage() {
       return message;
@@ -41,4 +43,5 @@ public final class AddMessageRecord {
    public int getDeliveryCount() {
       return deliveryCount;
    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index d41f0ed..f3ecd76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -28,31 +28,30 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
 import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.AckDescribe;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DeliveryCountUpdateEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.DuplicateIDEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.HeuristicCompletionEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.LargeMessageEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountPendingImpl;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecord;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageCountRecordInc;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PageUpdateTXEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.PendingLargeMessageEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.RefEncoding;
-import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.ScheduledDeliveryEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@@ -200,15 +199,15 @@ public final class DescribeJournal {
             public void checkRecordCounter(RecordInfo info) {
                if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
                   PageCountRecord encoding = (PageCountRecord) newObjectEncoding(info);
-                  long queueIDForCounter = encoding.queueID;
+                  long queueIDForCounter = encoding.getQueueID();
 
                   PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
 
-                  if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.value) {
-                     out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.value);
+                  if (subsCounter.getValue() != 0 && subsCounter.getValue() != encoding.getValue()) {
+                     out.println("####### Counter replace wrongly on queue " + queueIDForCounter + " oldValue=" + subsCounter.getValue() + " newValue=" + encoding.getValue());
                   }
 
-                  subsCounter.loadValue(info.id, encoding.value);
+                  subsCounter.loadValue(info.id, encoding.getValue());
                   subsCounter.processReload();
                   out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + ", result=" + subsCounter.getValue());
                   if (subsCounter.getValue() < 0) {
@@ -221,13 +220,13 @@ public final class DescribeJournal {
                }
                else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
                   PageCountRecordInc encoding = (PageCountRecordInc) newObjectEncoding(info);
-                  long queueIDForCounter = encoding.queueID;
+                  long queueIDForCounter = encoding.getQueueID();
 
                   PageSubscriptionCounterImpl subsCounter = lookupCounter(counters, queueIDForCounter);
 
-                  subsCounter.loadInc(info.id, encoding.value);
+                  subsCounter.loadInc(info.id, encoding.getValue());
                   subsCounter.processReload();
-                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.value);
+                  out.print("#Counter queue " + queueIDForCounter + " value=" + subsCounter.getValue() + " increased by " + encoding.getValue());
                   if (subsCounter.getValue() < 0) {
                      out.println(" #NegativeCounter!!!!");
                   }
@@ -311,20 +310,20 @@ public final class DescribeJournal {
          }
          else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_VALUE) {
             PageCountRecord encoding = (PageCountRecord) o;
-            queueIDForCounter = encoding.queueID;
+            queueIDForCounter = encoding.getQueueID();
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadValue(info.id, encoding.value);
+            subsCounter.loadValue(info.id, encoding.getValue());
             subsCounter.processReload();
          }
          else if (info.getUserRecordType() == JournalRecordIds.PAGE_CURSOR_COUNTER_INC) {
             PageCountRecordInc encoding = (PageCountRecordInc) o;
-            queueIDForCounter = encoding.queueID;
+            queueIDForCounter = encoding.getQueueID();
 
             subsCounter = lookupCounter(counters, queueIDForCounter);
 
-            subsCounter.loadInc(info.id, encoding.value);
+            subsCounter.loadInc(info.id, encoding.getValue());
             subsCounter.processReload();
          }
 
@@ -345,8 +344,8 @@ public final class DescribeJournal {
       out.println("### Prepared TX ###");
 
       for (PreparedTransactionInfo tx : preparedTransactions) {
-         out.println(tx.id);
-         for (RecordInfo info : tx.records) {
+         out.println(tx.getId());
+         for (RecordInfo info : tx.getRecords()) {
             Object o = newObjectEncoding(info);
             out.println("- " + describeRecord(info, o));
             if (info.getUserRecordType() == 31) {
@@ -365,7 +364,7 @@ public final class DescribeJournal {
             }
          }
 
-         for (RecordInfo info : tx.recordsToDelete) {
+         for (RecordInfo info : tx.getRecordsToDelete()) {
             out.println("- " + describeRecord(info) + " <marked to delete>");
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
new file mode 100644
index 0000000..194d2b1
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DummyOperationContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+
+final class DummyOperationContext implements OperationContext {
+
+   private static DummyOperationContext instance = new DummyOperationContext();
+
+   public static OperationContext getInstance() {
+      return DummyOperationContext.instance;
+   }
+
+   public void executeOnCompletion(final IOCallback runnable) {
+      // There are no executeOnCompletion calls while using the DummyOperationContext
+      // However we keep the code here for correctness
+      runnable.done();
+   }
+
+   public void replicationDone() {
+   }
+
+   public void replicationLineUp() {
+   }
+
+   public void storeLineUp() {
+   }
+
+   public void done() {
+   }
+
+   public void onError(final int errorCode, final String errorMessage) {
+   }
+
+   public void waitCompletion() {
+   }
+
+   public boolean waitCompletion(final long timeout) {
+      return true;
+   }
+
+   public void pageSyncLineUp() {
+   }
+
+   public void pageSyncDone() {
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index 5b12345..0242b50 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -26,7 +26,9 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
 public final class JournalRecordIds {
 
    // grouping journal record type
-   static final byte GROUP_RECORD = 20;
+
+   // mtaylor Added to compile
+   public static final byte GROUP_RECORD = 20;
 
    // BindingsImpl journal record type
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 390e742..c272a12 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -27,12 +27,10 @@ import java.security.MessageDigest;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -62,12 +60,10 @@ import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
@@ -78,7 +74,6 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReferenceImpl;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
 import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.persistence.GroupingInfo;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -87,6 +82,23 @@ import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
 import org.apache.activemq.artemis.core.persistence.config.PersistedRoles;
 import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeleteEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DeliveryCountUpdateEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.DuplicateIDEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.FinishPageMessageOperation;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.GroupingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.HeuristicCompletionEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.LargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountPendingImpl;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecord;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageCountRecordInc;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PageUpdateTXEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PendingLargeMessageEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.RefEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.ScheduledDeliveryEncoding;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.XidEncoding;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
@@ -106,22 +118,14 @@ import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.Transaction.State;
-import org.apache.activemq.artemis.core.transaction.TransactionOperation;
-import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Base64;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.XidCodecSupport;
 
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ACKNOWLEDGE_CURSOR;
-import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE_PENDING;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.DUPLICATE_ID;
 import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.PAGE_CURSOR_COUNTER_INC;
@@ -143,16 +147,6 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
  */
 public class JournalStorageManager implements StorageManager {
 
-   private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
-
-   private final Semaphore pageMaxConcurrentIO;
-
-   private final BatchingIDGenerator idGenerator;
-
-   private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
-
-   private ReplicationManager replicator;
-
    public enum JournalContent {
       BINDINGS((byte) 0), MESSAGES((byte) 1);
 
@@ -171,6 +165,16 @@ public class JournalStorageManager implements StorageManager {
       }
    }
 
+   private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
+
+   private final Semaphore pageMaxConcurrentIO;
+
+   private final BatchingIDGenerator idGenerator;
+
+   private final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
+
+   private ReplicationManager replicator;
+
    private final SequentialFileFactory journalFF;
 
    private Journal messageJournal;
@@ -1527,13 +1531,13 @@ public class JournalStorageManager implements StorageManager {
 
                   encoding.decode(buff);
 
-                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+                  PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadValue(record.id, encoding.value);
+                     sub.getCounter().loadValue(record.id, encoding.getValue());
                   }
                   else {
-                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.queueID);
+                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
                   }
 
@@ -1545,13 +1549,13 @@ public class JournalStorageManager implements StorageManager {
 
                   encoding.decode(buff);
 
-                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+                  PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().loadInc(record.id, encoding.value);
+                     sub.getCounter().loadInc(record.id, encoding.getValue());
                   }
                   else {
-                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.queueID);
+                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPageCursor(encoding.getQueueID());
                      messageJournal.appendDeleteRecord(record.id, false);
                   }
 
@@ -2024,7 +2028,7 @@ public class JournalStorageManager implements StorageManager {
 
    // Package protected ---------------------------------------------
 
-   private void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
+   protected void confirmLargeMessage(final LargeServerMessage largeServerMessage) {
       if (largeServerMessage.getPendingRecordID() >= 0) {
          try {
             confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
@@ -2138,7 +2142,7 @@ public class JournalStorageManager implements StorageManager {
     * @return
     * @throws Exception
     */
-   private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages,
+   protected LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages,
                                                 final ActiveMQBuffer buff) throws Exception {
       LargeServerMessage largeMessage = createLargeMessage();
 
@@ -2177,11 +2181,11 @@ public class JournalStorageManager implements StorageManager {
                                          JournalLoader journalLoader) throws Exception {
       // recover prepared transactions
       for (PreparedTransactionInfo preparedTransaction : preparedTransactions) {
-         XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+         XidEncoding encodingXid = new XidEncoding(preparedTransaction.getExtraData());
 
          Xid xid = encodingXid.xid;
 
-         Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
+         Transaction tx = new TransactionImpl(preparedTransaction.getId(), xid, this);
 
          List<MessageReference> referencesToAck = new ArrayList<>();
 
@@ -2191,7 +2195,7 @@ public class JournalStorageManager implements StorageManager {
          // Then have reacknowledge(tx) methods on queue, which needs to add the page size
 
          // first get any sent messages for this tx and recreate
-         for (RecordInfo record : preparedTransaction.records) {
+         for (RecordInfo record : preparedTransaction.getRecords()) {
             byte[] data = record.data;
 
             ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
@@ -2310,14 +2314,14 @@ public class JournalStorageManager implements StorageManager {
 
                   encoding.decode(buff);
 
-                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+                  PageSubscription sub = locateSubscription(encoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
 
                   if (sub != null) {
-                     sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.value);
+                     sub.getCounter().applyIncrementOnTX(tx, record.id, encoding.getValue());
                      sub.notEmpty();
                   }
                   else {
-                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
+                     ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.getQueueID());
                   }
 
                   break;
@@ -2329,7 +2333,7 @@ public class JournalStorageManager implements StorageManager {
             }
          }
 
-         for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete) {
+         for (RecordInfo recordDeleted : preparedTransaction.getRecordsToDelete()) {
             byte[] data = recordDeleted.data;
 
             if (data.length > 0) {
@@ -2431,879 +2435,7 @@ public class JournalStorageManager implements StorageManager {
       }
    }
 
-   /**
-    * It's public as other classes may want to unparse data on tools
-    */
-   public static class XidEncoding implements EncodingSupport {
-
-      public final Xid xid;
-
-      XidEncoding(final Xid xid) {
-         this.xid = xid;
-      }
-
-      XidEncoding(final byte[] data) {
-         xid = XidCodecSupport.decodeXid(ActiveMQBuffers.wrappedBuffer(data));
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         throw new IllegalStateException("Non Supported Operation");
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         XidCodecSupport.encodeXid(xid, buffer);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return XidCodecSupport.getXidEncodeLength(xid);
-      }
-   }
-
-   protected static class HeuristicCompletionEncoding implements EncodingSupport {
-
-      public Xid xid;
-
-      public boolean isCommit;
-
-      @Override
-      public String toString() {
-         return "HeuristicCompletionEncoding [xid=" + xid + ", isCommit=" + isCommit + "]";
-      }
-
-      HeuristicCompletionEncoding(final Xid xid, final boolean isCommit) {
-         this.xid = xid;
-         this.isCommit = isCommit;
-      }
-
-      HeuristicCompletionEncoding() {
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         xid = XidCodecSupport.decodeXid(buffer);
-         isCommit = buffer.readBoolean();
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         XidCodecSupport.encodeXid(xid, buffer);
-         buffer.writeBoolean(isCommit);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
-      }
-   }
-
-   private static class GroupingEncoding implements EncodingSupport, GroupingInfo {
-
-      public long id;
-
-      public SimpleString groupId;
-
-      public SimpleString clusterName;
-
-      public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName) {
-         this.id = id;
-         this.groupId = groupId;
-         this.clusterName = clusterName;
-      }
-
-      public GroupingEncoding() {
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeSimpleString(groupId);
-         buffer.writeSimpleString(clusterName);
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         groupId = buffer.readSimpleString();
-         clusterName = buffer.readSimpleString();
-      }
-
-      @Override
-      public long getId() {
-         return id;
-      }
-
-      public void setId(final long id) {
-         this.id = id;
-      }
-
-      @Override
-      public SimpleString getGroupId() {
-         return groupId;
-      }
-
-      @Override
-      public SimpleString getClusterName() {
-         return clusterName;
-      }
-
-      @Override
-      public String toString() {
-         return "GroupingEncoding [id=" + id + ", groupId=" + groupId + ", clusterName=" + clusterName + "]";
-      }
-   }
-
-   public static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo {
-
-      public long id;
-
-      public SimpleString name;
-
-      public SimpleString address;
-
-      public SimpleString filterString;
-
-      public boolean autoCreated;
-
-      public SimpleString user;
-
-      public PersistentQueueBindingEncoding() {
-      }
-
-      @Override
-      public String toString() {
-         return "PersistentQueueBindingEncoding [id=" + id +
-            ", name=" +
-            name +
-            ", address=" +
-            address +
-            ", filterString=" +
-            filterString +
-            ", user=" +
-            user +
-            ", autoCreated=" +
-            autoCreated +
-            "]";
-      }
-
-      public PersistentQueueBindingEncoding(final SimpleString name,
-                                            final SimpleString address,
-                                            final SimpleString filterString,
-                                            final SimpleString user,
-                                            final boolean autoCreated) {
-         this.name = name;
-         this.address = address;
-         this.filterString = filterString;
-         this.user = user;
-         this.autoCreated = autoCreated;
-      }
-
-      @Override
-      public long getId() {
-         return id;
-      }
-
-      public void setId(final long id) {
-         this.id = id;
-      }
-
-      @Override
-      public SimpleString getAddress() {
-         return address;
-      }
-
-      @Override
-      public void replaceQueueName(SimpleString newName) {
-         this.name = newName;
-      }
-
-      @Override
-      public SimpleString getFilterString() {
-         return filterString;
-      }
-
-      @Override
-      public SimpleString getQueueName() {
-         return name;
-      }
-
-      @Override
-      public SimpleString getUser() {
-         return user;
-      }
-
-      @Override
-      public boolean isAutoCreated() {
-         return autoCreated;
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         name = buffer.readSimpleString();
-         address = buffer.readSimpleString();
-         filterString = buffer.readNullableSimpleString();
-
-         String metadata = buffer.readNullableSimpleString().toString();
-         if (metadata != null) {
-            String[] elements = metadata.split(";");
-            for (String element : elements) {
-               String[] keyValuePair = element.split("=");
-               if (keyValuePair.length == 2) {
-                  if (keyValuePair[0].equals("user")) {
-                     user = SimpleString.toSimpleString(keyValuePair[1]);
-                  }
-               }
-            }
-         }
-
-         autoCreated = buffer.readBoolean();
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeSimpleString(name);
-         buffer.writeSimpleString(address);
-         buffer.writeNullableSimpleString(filterString);
-         buffer.writeNullableSimpleString(createMetadata());
-         buffer.writeBoolean(autoCreated);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
-            SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
-            SimpleString.sizeofNullableString(createMetadata());
-      }
-
-      private SimpleString createMetadata() {
-         StringBuilder metadata = new StringBuilder();
-         metadata.append("user=").append(user).append(";");
-         return SimpleString.toSimpleString(metadata.toString());
-      }
-   }
-
-   public static class LargeMessageEncoding implements EncodingSupport {
-
-      public final LargeServerMessage message;
-
-      public LargeMessageEncoding(final LargeServerMessage message) {
-         this.message = message;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-       */
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         message.decodeHeadersAndProperties(buffer);
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-       */
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         message.encode(buffer);
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
-       */
-      @Override
-      public int getEncodeSize() {
-         return message.getEncodeSize();
-      }
-
-   }
-
-   public static class PendingLargeMessageEncoding implements EncodingSupport {
-
-      public long largeMessageID;
-
-      public PendingLargeMessageEncoding(final long pendingLargeMessageID) {
-         this.largeMessageID = pendingLargeMessageID;
-      }
-
-      public PendingLargeMessageEncoding() {
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-       */
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         largeMessageID = buffer.readLong();
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.spi.core.remoting.ActiveMQBuffer)
-       */
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeLong(largeMessageID);
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
-       */
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG;
-      }
-
-      @Override
-      public String toString() {
-         return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
-      }
-
-   }
-
-   public static class DeliveryCountUpdateEncoding implements EncodingSupport {
-
-      public long queueID;
-
-      public int count;
-
-      public DeliveryCountUpdateEncoding() {
-         super();
-      }
-
-      public DeliveryCountUpdateEncoding(final long queueID, final int count) {
-         super();
-         this.queueID = queueID;
-         this.count = count;
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-         count = buffer.readInt();
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-         buffer.writeInt(count);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return 8 + 4;
-      }
-
-      @Override
-      public String toString() {
-         return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
-      }
-
-   }
-
-   public static class QueueEncoding implements EncodingSupport {
-
-      public long queueID;
-
-      public QueueEncoding(final long queueID) {
-         super();
-         this.queueID = queueID;
-      }
-
-      public QueueEncoding() {
-         super();
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return 8;
-      }
-
-      @Override
-      public String toString() {
-         return "QueueEncoding [queueID=" + queueID + "]";
-      }
-
-   }
-
-   private static class DeleteEncoding implements EncodingSupport {
-
-      public byte recordType;
-
-      public long id;
-
-      public DeleteEncoding(final byte recordType, final long id) {
-         this.recordType = recordType;
-         this.id = id;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
-       */
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
-       */
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeByte(recordType);
-         buffer.writeLong(id);
-      }
-
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
-       */
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         recordType = buffer.readByte();
-         id = buffer.readLong();
-      }
-   }
-
-   public static class RefEncoding extends QueueEncoding {
-
-      public RefEncoding() {
-         super();
-      }
-
-      public RefEncoding(final long queueID) {
-         super(queueID);
-      }
-   }
-
-   public static class PageUpdateTXEncoding implements EncodingSupport {
-
-      public long pageTX;
-
-      public int recods;
-
-      @Override
-      public String toString() {
-         return "PageUpdateTXEncoding [pageTX=" + pageTX + ", recods=" + recods + "]";
-      }
-
-      public PageUpdateTXEncoding() {
-      }
-
-      public PageUpdateTXEncoding(final long pageTX, final int records) {
-         this.pageTX = pageTX;
-         this.recods = records;
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         this.pageTX = buffer.readLong();
-         this.recods = buffer.readInt();
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeLong(pageTX);
-         buffer.writeInt(recods);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
-      }
-
-      public List<MessageReference> getRelatedMessageReferences() {
-         return null;
-      }
-   }
-
-   protected static class ScheduledDeliveryEncoding extends QueueEncoding {
-
-      long scheduledDeliveryTime;
-
-      @Override
-      public String toString() {
-         return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" + scheduledDeliveryTime + "(" + new Date(scheduledDeliveryTime) + ")]";
-      }
-
-      private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID) {
-         super(queueID);
-         this.scheduledDeliveryTime = scheduledDeliveryTime;
-      }
-
-      public ScheduledDeliveryEncoding() {
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return super.getEncodeSize() + 8;
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         super.encode(buffer);
-         buffer.writeLong(scheduledDeliveryTime);
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         super.decode(buffer);
-         scheduledDeliveryTime = buffer.readLong();
-      }
-   }
-
-   public static class DuplicateIDEncoding implements EncodingSupport {
-
-      SimpleString address;
-
-      byte[] duplID;
-
-      public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
-         this.address = address;
-
-         this.duplID = duplID;
-      }
-
-      public DuplicateIDEncoding() {
-      }
-
-      @Override
-      public void decode(final ActiveMQBuffer buffer) {
-         address = buffer.readSimpleString();
-
-         int size = buffer.readInt();
-
-         duplID = new byte[size];
-
-         buffer.readBytes(duplID);
-      }
-
-      @Override
-      public void encode(final ActiveMQBuffer buffer) {
-         buffer.writeSimpleString(address);
-
-         buffer.writeInt(duplID.length);
-
-         buffer.writeBytes(duplID);
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
-      }
-
-      @Override
-      public String toString() {
-         // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
-         // and this may be useful to validate the journal on those tests
-         // You may uncomment these two lines on that case and replcate the toString for the PrintData
-
-         // SimpleString simpleStr = new SimpleString(duplID);
-         // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
-
-         String bridgeRepresentation = null;
-
-         // The bridge will generate IDs on these terms:
-         // This will make them easier to read
-         if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
-            try {
-               ByteBuffer buff = ByteBuffer.wrap(duplID);
-
-               // 16 for UUID
-               byte[] bytesUUID = new byte[16];
-
-               buff.get(bytesUUID);
-
-               UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
-
-               long id = buff.getLong();
-               bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
-            }
-            catch (Throwable ignored) {
-               bridgeRepresentation = null;
-            }
-         }
-
-         if (bridgeRepresentation != null) {
-            return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
-               bridgeRepresentation + "]";
-         }
-         else {
-            return "DuplicateIDEncoding [address=" + address + ",str=" + ByteUtil.toSimpleString(duplID) + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
-         }
-      }
-   }
-
-   /**
-    * This is only used when loading a transaction.
-    * <p>
-    * it might be possible to merge the functionality of this class with
-    * {@link org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager.FinishPageMessageOperation}
-    */
-   // TODO: merge this class with the one on the PagingStoreImpl
-   private static class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
-
-      @Override
-      public void afterCommit(final Transaction tx) {
-         // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
-         // transaction until all the messages were added to the queue
-         // or else we could deliver the messages out of order
-
-         PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
-         if (pageTransaction != null) {
-            pageTransaction.commit();
-         }
-      }
-
-      @Override
-      public void afterRollback(final Transaction tx) {
-         PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
-         if (tx.getState() == State.PREPARED && pageTransaction != null) {
-            pageTransaction.rollback();
-         }
-      }
-   }
-
-   protected static final class PageCountRecord implements EncodingSupport {
-
-      @Override
-      public String toString() {
-         return "PageCountRecord [queueID=" + queueID + ", value=" + value + "]";
-      }
-
-      PageCountRecord() {
-
-      }
-
-      PageCountRecord(long queueID, long value) {
-         this.queueID = queueID;
-         this.value = value;
-      }
-
-      long queueID;
-
-      long value;
-
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG * 2;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-         buffer.writeLong(value);
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-         value = buffer.readLong();
-      }
-
-   }
-
-   protected static final class PageCountPendingImpl implements EncodingSupport, PageCountPending {
-
-      @Override
-      public String toString() {
-         return "PageCountPending [queueID=" + queueID + ", pageID=" + pageID + "]";
-      }
-
-      PageCountPendingImpl() {
-
-      }
-
-      PageCountPendingImpl(long queueID, long pageID, int inc) {
-         this.queueID = queueID;
-         this.pageID = pageID;
-      }
-
-      long id;
-
-      long queueID;
-
-      long pageID;
-
-      public void setID(long id) {
-         this.id = id;
-      }
-
-      @Override
-      public long getID() {
-         return id;
-      }
-
-      @Override
-      public long getQueueID() {
-         return queueID;
-      }
-
-      @Override
-      public long getPageID() {
-         return pageID;
-      }
-
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG * 2;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-         buffer.writeLong(pageID);
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-         pageID = buffer.readLong();
-      }
-
-   }
-
-   protected static final class PageCountRecordInc implements EncodingSupport {
-
-      @Override
-      public String toString() {
-         return "PageCountRecordInc [queueID=" + queueID + ", value=" + value + "]";
-      }
-
-      PageCountRecordInc() {
-
-      }
-
-      PageCountRecordInc(long queueID, int value) {
-         this.queueID = queueID;
-         this.value = value;
-      }
-
-      long queueID;
-
-      int value;
-
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-         buffer.writeInt(value);
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-         value = buffer.readInt();
-      }
-
-   }
-
-   public static class CursorAckRecordEncoding implements EncodingSupport {
-
-      public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
-         this.queueID = queueID;
-         this.position = position;
-      }
-
-      public CursorAckRecordEncoding() {
-         this.position = new PagePositionImpl();
-      }
-
-      @Override
-      public String toString() {
-         return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
-      }
-
-      public long queueID;
-
-      public PagePosition position;
-
-      @Override
-      public int getEncodeSize() {
-         return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
-      }
-
-      @Override
-      public void encode(ActiveMQBuffer buffer) {
-         buffer.writeLong(queueID);
-         buffer.writeLong(position.getPageNr());
-         buffer.writeInt(position.getMessageNr());
-      }
-
-      @Override
-      public void decode(ActiveMQBuffer buffer) {
-         queueID = buffer.readLong();
-         long pageNR = buffer.readLong();
-         int messageNR = buffer.readInt();
-         this.position = new PagePositionImpl(pageNR, messageNR);
-      }
-   }
-
-   private class LargeMessageTXFailureCallback implements TransactionFailureCallback {
-
-      private final Map<Long, ServerMessage> messages;
-
-      public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages) {
-         super();
-         this.messages = messages;
-      }
-
-      @Override
-      public void failedTransaction(final long transactionID,
-                                    final List<RecordInfo> records,
-                                    final List<RecordInfo> recordsToDelete) {
-         for (RecordInfo record : records) {
-            if (record.userRecordType == ADD_LARGE_MESSAGE) {
-               byte[] data = record.data;
-
-               ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
-
-               try {
-                  LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
-                  serverMessage.decrementDelayDeletionCount();
-               }
-               catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.journalError(e);
-               }
-            }
-         }
-      }
-
-   }
-
-   public static final class AckDescribe {
-
-      public RefEncoding refEncoding;
-
-      public AckDescribe(RefEncoding refEncoding) {
-         this.refEncoding = refEncoding;
-      }
-
-      @Override
-      public String toString() {
-         return "ACK;" + refEncoding;
-      }
-
-   }
-
-   /**
+   /*
     * @param id
     * @param buffer
     * @return
@@ -3379,21 +2511,4 @@ public class JournalStorageManager implements StorageManager {
       }
       txoper.confirmedMessages.add(recordID);
    }
-
-   final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
-
-      public List<Long> confirmedMessages = new LinkedList<>();
-
-      @Override
-      public void afterRollback(Transaction tx) {
-         for (Long msg : confirmedMessages) {
-            try {
-               JournalStorageManager.this.confirmPendingLargeMessage(msg);
-            }
-            catch (Throwable e) {
-               ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
-            }
-         }
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
new file mode 100644
index 0000000..c2133d9
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeMessageTXFailureCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.ADD_LARGE_MESSAGE;
+
+public class LargeMessageTXFailureCallback implements TransactionFailureCallback {
+
+   private AbstractJournalStorageManager journalStorageManager;
+   private final Map<Long, ServerMessage> messages;
+
+   public LargeMessageTXFailureCallback(AbstractJournalStorageManager journalStorageManager,
+                                        final Map<Long, ServerMessage> messages) {
+      super();
+      this.journalStorageManager = journalStorageManager;
+      this.messages = messages;
+   }
+
+   public void failedTransaction(final long transactionID,
+                                 final List<RecordInfo> records,
+                                 final List<RecordInfo> recordsToDelete) {
+      for (RecordInfo record : records) {
+         if (record.userRecordType == ADD_LARGE_MESSAGE) {
+            byte[] data = record.data;
+
+            ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(data);
+
+            try {
+               LargeServerMessage serverMessage = journalStorageManager.parseLargeMessage(messages, buff);
+               serverMessage.decrementDelayDeletionCount();
+            }
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.journalError(e);
+            }
+         }
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
new file mode 100644
index 0000000..361477a
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/TXLargeMessageConfirmationOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+
+public final class TXLargeMessageConfirmationOperation extends TransactionOperationAbstract {
+
+   private AbstractJournalStorageManager journalStorageManager;
+   public List<Long> confirmedMessages = new LinkedList<Long>();
+
+   public TXLargeMessageConfirmationOperation(AbstractJournalStorageManager journalStorageManager) {
+      this.journalStorageManager = journalStorageManager;
+   }
+
+   @Override
+   public void afterRollback(Transaction tx) {
+      for (Long msg : confirmedMessages) {
+         try {
+            journalStorageManager.confirmPendingLargeMessage(msg);
+         }
+         catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.journalErrorConfirmingLargeMessage(e, msg);
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
new file mode 100644
index 0000000..4e1fada
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/CursorAckRecordEncoding.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class CursorAckRecordEncoding implements EncodingSupport {
+
+   public CursorAckRecordEncoding(final long queueID, final PagePosition position) {
+      this.queueID = queueID;
+      this.position = position;
+   }
+
+   public CursorAckRecordEncoding() {
+      this.position = new PagePositionImpl();
+   }
+
+   @Override
+   public String toString() {
+      return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
+   }
+
+   public long queueID;
+
+   public PagePosition position;
+
+   public int getEncodeSize() {
+      return DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+   }
+
+   public void encode(ActiveMQBuffer buffer) {
+      buffer.writeLong(queueID);
+      buffer.writeLong(position.getPageNr());
+      buffer.writeInt(position.getMessageNr());
+   }
+
+   public void decode(ActiveMQBuffer buffer) {
+      queueID = buffer.readLong();
+      long pageNR = buffer.readLong();
+      int messageNR = buffer.readInt();
+      this.position = new PagePositionImpl(pageNR, messageNR);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
new file mode 100644
index 0000000..3001edf
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeleteEncoding.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+public class DeleteEncoding implements EncodingSupport {
+
+   public byte recordType;
+
+   public long id;
+
+   public DeleteEncoding(final byte recordType, final long id) {
+      this.recordType = recordType;
+      this.id = id;
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#getEncodeSize()
+    */
+   @Override
+   public int getEncodeSize() {
+      return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#encode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
+    */
+   @Override
+   public void encode(ActiveMQBuffer buffer) {
+      buffer.writeByte(recordType);
+      buffer.writeLong(id);
+   }
+
+   /* (non-Javadoc)
+    * @see org.apache.activemq.artemis.core.journal.EncodingSupport#decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)
+    */
+   @Override
+   public void decode(ActiveMQBuffer buffer) {
+      recordType = buffer.readByte();
+      id = buffer.readLong();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
new file mode 100644
index 0000000..5c4541d
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DeliveryCountUpdateEncoding.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+
+public class DeliveryCountUpdateEncoding implements EncodingSupport {
+
+   public long queueID;
+
+   public int count;
+
+   public DeliveryCountUpdateEncoding() {
+      super();
+   }
+
+   public DeliveryCountUpdateEncoding(final long queueID, final int count) {
+      super();
+      this.queueID = queueID;
+      this.count = count;
+   }
+
+   public void decode(final ActiveMQBuffer buffer) {
+      queueID = buffer.readLong();
+      count = buffer.readInt();
+   }
+
+   public void encode(final ActiveMQBuffer buffer) {
+      buffer.writeLong(queueID);
+      buffer.writeInt(count);
+   }
+
+   public int getEncodeSize() {
+      return 8 + 4;
+   }
+
+   @Override
+   public String toString() {
+      return "DeliveryCountUpdateEncoding [queueID=" + queueID + ", count=" + count + "]";
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
new file mode 100644
index 0000000..49d8dc8
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/DuplicateIDEncoding.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+import org.apache.activemq.artemis.utils.ByteUtil;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.UUID;
+
+public class DuplicateIDEncoding implements EncodingSupport {
+
+   public SimpleString address;
+
+   public byte[] duplID;
+
+   public DuplicateIDEncoding(final SimpleString address, final byte[] duplID) {
+      this.address = address;
+
+      this.duplID = duplID;
+   }
+
+   public DuplicateIDEncoding() {
+   }
+
+   public void decode(final ActiveMQBuffer buffer) {
+      address = buffer.readSimpleString();
+
+      int size = buffer.readInt();
+
+      duplID = new byte[size];
+
+      buffer.readBytes(duplID);
+   }
+
+   public void encode(final ActiveMQBuffer buffer) {
+      buffer.writeSimpleString(address);
+
+      buffer.writeInt(duplID.length);
+
+      buffer.writeBytes(duplID);
+   }
+
+   public int getEncodeSize() {
+      return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
+   }
+
+   @Override
+   public String toString() {
+      // this would be useful when testing. Most tests on the testsuite will use a SimpleString on the duplicate ID
+      // and this may be useful to validate the journal on those tests
+      // You may uncomment these two lines on that case and replcate the toString for the PrintData
+
+      // SimpleString simpleStr = new SimpleString(duplID);
+      // return "DuplicateIDEncoding [address=" + address + ", duplID=" + simpleStr + "]";
+
+      String bridgeRepresentation = null;
+
+      // The bridge will generate IDs on these terms:
+      // This will make them easier to read
+      if (address.toString().startsWith("BRIDGE") && duplID.length == 24) {
+         try {
+            ByteBuffer buff = ByteBuffer.wrap(duplID);
+
+            // 16 for UUID
+            byte[] bytesUUID = new byte[16];
+
+            buff.get(bytesUUID);
+
+            UUID uuid = new UUID(UUID.TYPE_TIME_BASED, bytesUUID);
+
+            long id = buff.getLong();
+            bridgeRepresentation = "nodeUUID=" + uuid.toString() + " messageID=" + id;
+         }
+         catch (Throwable ignored) {
+            bridgeRepresentation = null;
+         }
+      }
+
+      if (bridgeRepresentation != null) {
+         return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + " / " +
+            bridgeRepresentation + "]";
+      }
+      else {
+         return "DuplicateIDEncoding [address=" + address + ", duplID=" + ByteUtil.bytesToHex(duplID, 2) + "]";
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9b351d82/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
new file mode 100644
index 0000000..d741e95
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/FinishPageMessageOperation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.persistence.impl.journal.codec;
+
+import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperation;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
+import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
+
+/**
+ * This is only used when loading a transaction.
+ * <p>
+ * it might be possible to merge the functionality of this class with
+ * {@link FinishPageMessageOperation}
+ */
+// TODO: merge this class with the one on the PagingStoreImpl
+public class FinishPageMessageOperation extends TransactionOperationAbstract implements TransactionOperation {
+
+   @Override
+   public void afterCommit(final Transaction tx) {
+      // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+      // transaction until all the messages were added to the queue
+      // or else we could deliver the messages out of order
+
+      PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+      if (pageTransaction != null) {
+         pageTransaction.commit();
+      }
+   }
+
+   @Override
+   public void afterRollback(final Transaction tx) {
+      PageTransactionInfo pageTransaction = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+      if (tx.getState() == Transaction.State.PREPARED && pageTransaction != null) {
+         pageTransaction.rollback();
+      }
+   }
+}


Mime
View raw message