From commits-return-63579-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Jun 9 18:52:17 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 3EB6218063F for ; Wed, 9 Jun 2021 20:52:17 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 611333F4B4 for ; Wed, 9 Jun 2021 18:52:16 +0000 (UTC) Received: (qmail 23584 invoked by uid 500); 9 Jun 2021 18:52:16 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 23574 invoked by uid 99); 9 Jun 2021 18:52:15 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Jun 2021 18:52:15 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8BDB481A86; Wed, 9 Jun 2021 18:52:15 +0000 (UTC) Date: Wed, 09 Jun 2021 18:52:15 +0000 To: "commits@activemq.apache.org" Subject: [activemq-artemis] branch main updated: ARTEMIS-3261 Remove need to lookup replaceableRecords on the hot path MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162326473525.9813.10766440475238348356@gitbox.apache.org> From: clebertsuconic@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: activemq-artemis X-Git-Refname: refs/heads/main X-Git-Reftype: branch X-Git-Oldrev: eb4723cdc1123a4a683f7d6ca079ca23c50e4fe1 X-Git-Newrev: 05498c350eb8be49dd26a0b593516b1988df6108 X-Git-Rev: 05498c350eb8be49dd26a0b593516b1988df6108 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git The following commit(s) were added to refs/heads/main by this push: new 05498c3 ARTEMIS-3261 Remove need to lookup replaceableRecords on the hot path 05498c3 is described below commit 05498c350eb8be49dd26a0b593516b1988df6108 Author: Clebert Suconic AuthorDate: Tue Jun 8 10:00:45 2021 -0400 ARTEMIS-3261 Remove need to lookup replaceableRecords on the hot path We known it's a replaceable record as part of the logic, no need to lookup the record type unless it's a reload from the system. --- .../cli/commands/tools/RecoverMessages.java | 2 +- .../cli/commands/tools/journal/DecodeJournal.java | 2 +- .../jdbc/store/journal/JDBCJournalImpl.java | 7 +-- .../jdbc/store/journal/JDBCJournalRecord.java | 2 +- .../journal/JDBCJournalLoaderCallbackTest.java | 2 +- .../activemq/artemis/core/journal/Journal.java | 12 +++-- .../activemq/artemis/core/journal/RecordInfo.java | 5 ++ .../journal/impl/AbstractJournalUpdateTask.java | 2 +- .../core/journal/impl/FileWrapperJournal.java | 1 + .../artemis/core/journal/impl/JournalBase.java | 10 ++-- .../core/journal/impl/JournalCompactor.java | 25 ++++------ .../artemis/core/journal/impl/JournalImpl.java | 54 ++++++++++++---------- .../core/journal/impl/JournalRecordProvider.java | 2 - .../core/journal/impl/JournalTransaction.java | 16 +++---- .../journal/AbstractJournalStorageManager.java | 8 ++-- .../persistence/impl/journal/DescribeJournal.java | 2 +- .../core/replication/ReplicatedJournal.java | 12 +++-- .../integration/replication/ReplicationTest.java | 7 +-- .../infinite/InfiniteRedeliverySmokeTest.java | 15 +++++- .../core/journal/impl/JournalImplTestBase.java | 14 +++--- .../core/journal/impl/JournalImplTestUnit.java | 4 +- 21 files changed, 115 insertions(+), 89 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java index 31c7e72..f6f59c2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java @@ -239,7 +239,7 @@ public class RecoverMessages extends DBOption { public void markAsDataFile(JournalFile file) { } - }, null, reclaimed); + }, null, reclaimed, null); } targetJournal.flush(); diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java index 1b1d673..fd5010d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/journal/DecodeJournal.java @@ -234,7 +234,7 @@ public class DecodeJournal extends LockAbstract { byte userRecordType = parseByte("userRecordType", properties); boolean isUpdate = parseBoolean("isUpdate", properties); byte[] data = parseEncoding("data", properties); - return new RecordInfo(id, userRecordType, data, isUpdate, (short) 0); + return new RecordInfo(id, userRecordType, data, isUpdate, false, (short) 0); } private static byte[] parseEncoding(final String name, final Properties properties) throws Exception { diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java index 154d39a..a4281b3 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java @@ -412,7 +412,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { // We actually only need the record ID in this instance. if (record.isTransactional()) { - RecordInfo info = new RecordInfo(record.getId(), record.getRecordType(), new byte[0], record.isUpdate(), record.getCompactCount()); + RecordInfo info = new RecordInfo(record.getId(), record.getRecordType(), new byte[0], record.isUpdate(), false, record.getCompactCount()); if (record.getRecordType() == JDBCJournalRecord.DELETE_RECORD_TX) { txHolder.recordsToDelete.add(info); } else { @@ -498,7 +498,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { + public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception { appendUpdateRecord(id, recordType, record, sync); } @@ -518,7 +518,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { } @Override - public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { + public void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception { appendUpdateRecord(id, recordType, persister, record, sync); } @@ -552,6 +552,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal { Persister persister, Object record, boolean sync, + boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { appendUpdateRecord(id, recordType, persister, record, sync, completionCallback); diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java index 6750da1..1ae8d1d 100644 --- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java +++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java @@ -296,7 +296,7 @@ class JDBCJournalRecord { } RecordInfo toRecordInfo() throws IOException { - return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), getCompactCount()); + return new RecordInfo(getId(), getUserRecordType(), getRecordData(), isUpdate(), false, getCompactCount()); } public boolean isTransactional() { diff --git a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java index e4f83fa..1a3a333 100644 --- a/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java +++ b/artemis-jdbc-store/src/test/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalLoaderCallbackTest.java @@ -45,7 +45,7 @@ public class JDBCJournalLoaderCallbackTest { JDBCJournalLoaderCallback cb = new JDBCJournalLoaderCallback(committedRecords, preparedTransactions, failureCallback, fixBadTX); - RecordInfo record = new RecordInfo(42, (byte) 0, null, false, (short) 0); + RecordInfo record = new RecordInfo(42, (byte) 0, null, false, false, (short) 0); cb.addRecord(record); assertEquals(1, committedRecords.size()); assertTrue(committedRecords.contains(record)); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java index 28aa2d2..e10dc85 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java @@ -110,19 +110,19 @@ public interface Journal extends ActiveMQComponent { void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception; - void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception; + void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception; default void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception { appendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync); } - default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { - tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync); + default void tryAppendUpdateRecord(long id, byte recordType, EncodingSupport record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableRecord) throws Exception { + tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, updateCallback, sync, replaceableRecord); } void appendUpdateRecord(long id, byte recordType, Persister persister, Object record, boolean sync) throws Exception; - void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync) throws Exception; + void tryAppendUpdateRecord(long id, byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceableUpdate) throws Exception; default void appendUpdateRecord(long id, byte recordType, @@ -136,9 +136,10 @@ public interface Journal extends ActiveMQComponent { byte recordType, EncodingSupport record, boolean sync, + boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion completionCallback) throws Exception { - tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, updateCallback, completionCallback); + tryAppendUpdateRecord(id, recordType, EncoderPersister.getInstance(), record, sync, replaceableUpdate, updateCallback, completionCallback); } void appendUpdateRecord(long id, @@ -153,6 +154,7 @@ public interface Journal extends ActiveMQComponent { Persister persister, Object record, boolean sync, + boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java index ddc5ea6..888dd82 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/RecordInfo.java @@ -22,6 +22,7 @@ public class RecordInfo { final byte userRecordType, final byte[] data, final boolean isUpdate, + final boolean replaceableUpdate, final short compactCount) { this.id = id; @@ -31,6 +32,8 @@ public class RecordInfo { this.isUpdate = isUpdate; + this.replaceableUpdate = replaceableUpdate; + this.compactCount = compactCount; } @@ -49,6 +52,8 @@ public class RecordInfo { public boolean isUpdate; + public boolean replaceableUpdate; + public byte getUserRecordType() { return userRecordType; } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java index 66c38a1..2fd4406 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java @@ -170,7 +170,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback public void onReadAddRecord(final RecordInfo info) throws Exception { records.add(info); } - }, wholeFileBufferRef); + }, wholeFileBufferRef, false, null); if (records.size() == 0) { // the record is damaged diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java index 325e616..0c2d9dc 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java @@ -228,6 +228,7 @@ public final class FileWrapperJournal extends JournalBase { Persister persister, Object record, boolean sync, + boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception { JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, persister, record); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java index d2df4de..6bc58da 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java @@ -94,8 +94,9 @@ abstract class JournalBase implements Journal { final byte recordType, final byte[] record, JournalUpdateCallback updateCallback, - final boolean sync) throws Exception { - tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync); + final boolean sync, + final boolean replaceableRecord) throws Exception { + tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync, replaceableRecord); } @Override @@ -163,10 +164,11 @@ abstract class JournalBase implements Journal { final Persister persister, final Object record, final JournalUpdateCallback updateCallback, - final boolean sync) throws Exception { + final boolean sync, + final boolean replaceableUpdate) throws Exception { SyncIOCompletion callback = getSyncCallback(sync); - tryAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback); + tryAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback); if (callback != null) { callback.waitCompletion(); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java index 0108bf9..0bd1c0f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java @@ -144,20 +144,15 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ pendingCommands.add(new DeleteCompactCommand(id, usedFile)); } - @Override - public boolean isReplaceableRecord(byte recordType) { - return journal.isReplaceableRecord(recordType); - } - /** * @param id * @param usedFile */ - public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, byte userRecordType) { + public void addCommandUpdate(final long id, final JournalFile usedFile, final int size, final boolean replaceableUpdate) { if (logger.isTraceEnabled()) { logger.trace("addCommandUpdate id " + id + " usedFile " + usedFile + " size " + size); } - pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, userRecordType)); + pendingCommands.add(new UpdateCompactCommand(id, usedFile, size, replaceableUpdate)); } private void checkSize(final int size) throws Exception { @@ -278,7 +273,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ checkSize(record.getEncodeSize(), info.compactCount); - newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.userRecordType); + newTransaction.addPositive(currentFile, info.id, record.getEncodeSize(), info.replaceableUpdate); writeEncoder(record); } @@ -472,7 +467,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (newRecord == null) { ActiveMQJournalLogger.LOGGER.compactingWithNoAddRecord(info.id); } else { - newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), journal.isReplaceableRecord(info.userRecordType)); + newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize(), info.replaceableUpdate); } writeEncoder(updateRecord); @@ -502,7 +497,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ writeEncoder(updateRecordTX); - newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.userRecordType); + newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize(), info.replaceableUpdate); } /** @@ -566,17 +561,17 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ private final long id; - private final byte userRecordType; - private final JournalFile usedFile; private final int size; - private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, byte userRecordType) { + private final boolean replaceableUpdate; + + private UpdateCompactCommand(final long id, final JournalFile usedFile, final int size, boolean replaceableUpdate) { this.id = id; this.usedFile = usedFile; this.size = size; - this.userRecordType = userRecordType; + this.replaceableUpdate = replaceableUpdate; } @Override @@ -585,7 +580,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ if (updateRecord == null) { ActiveMQJournalLogger.LOGGER.noRecordDuringCompactReplay(id); } else { - updateRecord.addUpdateFile(usedFile, size, journal.isReplaceableRecord(userRecordType)); + updateRecord.addUpdateFile(usedFile, size, replaceableUpdate); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index 383f2e9..ed6f38a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -339,12 +339,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal replaceableRecords.put(recordType, Boolean.TRUE); } - @Override - public boolean isReplaceableRecord(byte recordType) { - return replaceableRecords != null && replaceableRecords.containsKey(recordType); - } - - private volatile JournalFile currentFile; private volatile JournalState state = JournalState.STOPPED; @@ -570,10 +564,18 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } public static int readJournalFile(final SequentialFileFactory fileFactory, + final JournalFile file, + final JournalReaderCallback reader, + final AtomicReference wholeFileBufferReference, + boolean reclaimed) throws Exception { + return readJournalFile(fileFactory, file, reader, wholeFileBufferReference, reclaimed, null); + } + + public static int readJournalFile(final SequentialFileFactory fileFactory, final JournalFile file, final JournalReaderCallback reader, final AtomicReference wholeFileBufferReference, - boolean reclaimed) throws Exception { + boolean reclaimed, ByteObjectHashMap replaceableRecords) throws Exception { file.getFile().open(1, false); ByteBuffer wholeFileBuffer = null; try { @@ -801,19 +803,21 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal logger.trace("reading " + recordID + ", userRecordType=" + userRecordType + ", compactCount=" + compactCount); } + boolean replaceableUpdate = replaceableRecords != null ? replaceableRecords.containsKey(userRecordType) : false; + switch (recordType) { case EVENT_RECORD: { - reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount)); + reader.onReadEventRecord(new RecordInfo(recordID, userRecordType, record, false, replaceableUpdate, compactCount)); break; } case ADD_RECORD: { - reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, compactCount)); + reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false, false, compactCount)); break; } case UPDATE_RECORD: { - reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, compactCount)); + reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount)); break; } @@ -823,17 +827,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } case ADD_RECORD_TX: { - reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, compactCount)); + reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false, false, compactCount)); break; } case UPDATE_RECORD_TX: { - reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, compactCount)); + reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true, replaceableUpdate, compactCount)); break; } case DELETE_RECORD_TX: { - reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, compactCount)); + reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte) 0, record, true, false, compactCount)); break; } @@ -899,7 +903,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal public static int readJournalFile(final SequentialFileFactory fileFactory, final JournalFile file, final JournalReaderCallback reader) throws Exception { - return readJournalFile(fileFactory, file, reader, null); + return readJournalFile(fileFactory, file, reader, null, false, null); } // Journal implementation @@ -1040,7 +1044,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal SimpleFuture future = new SimpleFutureImpl<>(); - internalAppendUpdateRecord(id, recordType, persister, record, sync, (t, v) -> future.set(v), callback); + internalAppendUpdateRecord(id, recordType, persister, record, sync, false, (t, v) -> future.set(v), callback); if (!future.get()) { throw new IllegalStateException("Cannot find add info " + id); @@ -1054,6 +1058,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal final Persister persister, final Object record, final boolean sync, + final boolean replaceableUpdate, JournalUpdateCallback updateCallback, final IOCompletion callback) throws Exception { checkJournalIsLoaded(); @@ -1066,7 +1071,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal } - internalAppendUpdateRecord(id, recordType, persister, record, sync, updateCallback, callback); + internalAppendUpdateRecord(id, recordType, persister, record, sync, replaceableUpdate, updateCallback, callback); } @@ -1075,6 +1080,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal Persister persister, Object record, boolean sync, + boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion callback) throws InterruptedException, java.util.concurrent.ExecutionException { appendExecutor.execute(new Runnable() { @@ -1116,10 +1122,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // computing the delete should be done after compacting is done if (jrnRecord == null) { if (compactor != null) { - compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), recordType); + compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize(), replaceableUpdate); } } else { - jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), isReplaceableRecord(recordType)); + jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize(), replaceableUpdate); } if (updateCallback != null) { @@ -1298,7 +1304,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile); } - tx.addPositive(usedFile, id, encodeSize, recordType); + tx.addPositive(usedFile, id, encodeSize, false); } catch (Throwable e) { logger.error("appendAddRecordTransactional:" + e, e); setErrorCondition(null, tx, e); @@ -1361,7 +1367,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile ); } - tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), recordType); + tx.addPositive( usedFile, id, updateRecordTX.getEncodeSize(), false); } catch (Throwable e ) { logger.error("appendUpdateRecordTransactional:" + e.getMessage(), e ); setErrorCondition(null, tx, e ); @@ -1845,7 +1851,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal try { for (final JournalFile file : dataFilesToProcess) { try { - JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef); + JournalImpl.readJournalFile(fileFactory, file, compactor, wholeFileBufferRef, false, this.replaceableRecords); } catch (Throwable e) { ActiveMQJournalLogger.LOGGER.compactReadError(file); throw new Exception("Error on reading compacting for " + file, e); @@ -2132,7 +2138,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal // have been deleted // just leaving some updates in this file - posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, isReplaceableRecord(info.userRecordType)); // +1 = compact + posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, info.replaceableUpdate); // +1 = compact // count } } @@ -2180,7 +2186,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal transactions.put(transactionID, tnp); } - tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.userRecordType); // +1 = compact + tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.replaceableUpdate); // +1 = compact // count } @@ -2320,7 +2326,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal hasData.lazySet(true); } - }, wholeFileBufferRef); + }, wholeFileBufferRef, false, this.replaceableRecords); if (hasData.get()) { lastDataPos = resultLastPost; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java index 6f9b40c..c9c92f4 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalRecordProvider.java @@ -30,6 +30,4 @@ public interface JournalRecordProvider { JournalCompactor getCompactor(); ConcurrentLongHashMap getRecords(); - - boolean isReplaceableRecord(byte recordType); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java index 0cfd369..d557201 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalTransaction.java @@ -202,7 +202,7 @@ public class JournalTransaction { } } - public void addPositive(final JournalFile file, final long id, final int size, final byte userRecordType) { + public void addPositive(final JournalFile file, final long id, final int size, final boolean replaceableRecord) { incCounter(file); addFile(file); @@ -211,7 +211,7 @@ public class JournalTransaction { pos = new ArrayList<>(); } - pos.add(new JournalUpdate(file, id, size, userRecordType)); + pos.add(new JournalUpdate(file, id, size, replaceableRecord)); } public void addNegative(final JournalFile file, final long id) { @@ -223,7 +223,7 @@ public class JournalTransaction { neg = new ArrayList<>(); } - neg.add(new JournalUpdate(file, id, 0, (byte)0)); + neg.add(new JournalUpdate(file, id, 0, false)); } /** @@ -254,13 +254,13 @@ public class JournalTransaction { // This is a case where the transaction was opened after compacting was started, // but the commit arrived while compacting was working // We need to cache the counter update, so compacting will take the correct files when it is done - compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.userRecordType); + compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size, trUpdate.replaceableUpdate); } else if (posFiles == null) { posFiles = new JournalRecord(trUpdate.file, trUpdate.size); journal.getRecords().put(trUpdate.id, posFiles); } else { - posFiles.addUpdateFile(trUpdate.file, trUpdate.size, journal.isReplaceableRecord(trUpdate.userRecordType)); + posFiles.addUpdateFile(trUpdate.file, trUpdate.size, trUpdate.replaceableUpdate); } } } @@ -397,19 +397,19 @@ public class JournalTransaction { int size; - final byte userRecordType; + final boolean replaceableUpdate; /** * @param file * @param id * @param size */ - private JournalUpdate(final JournalFile file, final long id, final int size, final byte userRecordType) { + private JournalUpdate(final JournalFile file, final long id, final int size, final boolean replaceableUpdate) { super(); this.file = file; this.id = id; this.size = size; - this.userRecordType = userRecordType; + this.replaceableUpdate = replaceableUpdate; } /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index a447285..314982e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -385,7 +385,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, this::messageUpdateCallback, getContext(last && syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, false, this::messageUpdateCallback, getContext(last && syncNonTransactional)); } } @@ -428,7 +428,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp @Override public void storeAcknowledge(final long queueID, final long messageID) throws Exception { try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(messageID, JournalRecordIds.ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, false, this::messageUpdateCallback, getContext(syncNonTransactional)); } } @@ -470,7 +470,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception { ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue().getID()); try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, this::recordNotFoundCallback, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME, encoding, syncNonTransactional, true, this::recordNotFoundCallback, getContext(syncNonTransactional)); } } @@ -702,7 +702,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount()); try (ArtemisCloseable lock = closeableReadLock()) { - messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, this::messageUpdateCallback, getContext(syncNonTransactional)); + messageJournal.tryAppendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, true, this::messageUpdateCallback, getContext(syncNonTransactional)); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java index 6b63074..a6a0ad1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java @@ -309,7 +309,7 @@ public final class DescribeJournal { recordsPrintStream.println(); } } - }, null, reclaimed); + }, null, reclaimed, null); } recordsPrintStream.println(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java index 58f7069..bc2417b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicatedJournal.java @@ -401,9 +401,10 @@ public class ReplicatedJournal implements Journal { final byte recordType, final byte[] record, final JournalUpdateCallback updateCallback, - final boolean sync) throws Exception { + final boolean sync, + final boolean replaceableRecord) throws Exception { - this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync); + this.tryAppendUpdateRecord(id, recordType, new ByteArrayEncoding(record), updateCallback, sync, replaceableRecord); } /** @@ -433,12 +434,12 @@ public class ReplicatedJournal implements Journal { final Persister persister, final Object record, final JournalUpdateCallback updateCallback, - final boolean sync) throws Exception { + final boolean sync, final boolean replaceable) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType); } replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, recordType, persister, record); - localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync); + localJournal.tryAppendUpdateRecord(id, recordType, persister, record, updateCallback, sync, replaceable); } @Override @@ -461,13 +462,14 @@ public class ReplicatedJournal implements Journal { final Persister persister, final Object record, final boolean sync, + final boolean replaceableUpdate, final JournalUpdateCallback updateCallback, final IOCompletion completionCallback) throws Exception { if (log.isTraceEnabled()) { log.trace("AppendUpdateRecord id = " + id + " , recordType = " + journalRecordType); } replicationManager.appendUpdateRecord(journalID, ADD_OPERATION_TYPE.UPDATE, id, journalRecordType, persister, record); - localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, updateCallback, completionCallback); + localJournal.tryAppendUpdateRecord(id, journalRecordType, persister, record, sync, replaceableUpdate, updateCallback, completionCallback); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 3552f2a..539e845 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -701,7 +701,8 @@ public final class ReplicationTest extends ActiveMQTestBase { byte recordType, Persister persister, Object record, JournalUpdateCallback updateCallback, - boolean sync) throws Exception { + boolean sync, + boolean repalceableUpdate) throws Exception { } @Override @@ -719,7 +720,7 @@ public final class ReplicationTest extends ActiveMQTestBase { byte recordType, Persister persister, Object record, - boolean sync, JournalUpdateCallback updateCallback, + boolean sync, boolean replaceableUpdate, JournalUpdateCallback updateCallback, IOCompletion callback) throws Exception { } @@ -844,7 +845,7 @@ public final class ReplicationTest extends ActiveMQTestBase { } @Override - public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync) throws Exception { + public void tryAppendUpdateRecord(long id, byte recordType, byte[] record, JournalUpdateCallback updateCallback, boolean sync, boolean replaceable) throws Exception { } @Override diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java index 86b641e..f68da0b 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/infinite/InfiniteRedeliverySmokeTest.java @@ -44,10 +44,12 @@ public class InfiniteRedeliverySmokeTest extends SmokeTestBase { public static final String SERVER_NAME_0 = "infinite-redelivery"; + Process serverProcess; + @Before public void before() throws Exception { cleanupData(SERVER_NAME_0); - startServer(SERVER_NAME_0, 0, 30000); + serverProcess = startServer(SERVER_NAME_0, 0, 30000); } @Test @@ -89,6 +91,17 @@ public class InfiniteRedeliverySmokeTest extends SmokeTestBase { // as the real test I'm after here is the broker should clean itself up Wait.assertTrue("there are too many files created", () -> fileFactory.listFiles("amq").size() <= 20); + if (i % 100 == 0 && i > 0) { + connection.close(); + serverProcess.destroyForcibly(); + Thread.sleep(1000); + serverProcess = startServer(SERVER_NAME_0, 0, 3000); + connection = factory.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(queue); + connection.start(); + } + } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java index 0ec7f5c..fe0fb0a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestBase.java @@ -409,7 +409,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.appendAddRecord(element, (byte) 0, record, sync); - records.add(new RecordInfo(element, (byte) 0, record, false, (short) 0)); + records.add(new RecordInfo(element, (byte) 0, record, false, false, (short) 0)); } journal.debugWait(); @@ -422,11 +422,11 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { SimpleFutureImpl future = new SimpleFutureImpl(); - journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync); + journal.tryAppendUpdateRecord(argument, (byte) 0, updateRecord, (r, b) -> future.set(b), sync, false); if (future.get()) { Assert.fail(); - records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, (short) 0)); + records.add(new RecordInfo(argument, (byte) 0, updateRecord, true, false, (short) 0)); } return future.get(); @@ -440,7 +440,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.appendUpdateRecord(element, (byte) 0, updateRecord, sync); - records.add(new RecordInfo(element, (byte) 0, updateRecord, true, (short) 0)); + records.add(new RecordInfo(element, (byte) 0, updateRecord, true, false, (short) 0)); } journal.debugWait(); @@ -485,7 +485,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.appendAddRecordTransactional(txID, element, (byte) 0, record); - tx.records.add(new RecordInfo(element, (byte) 0, record, false, (short) 0)); + tx.records.add(new RecordInfo(element, (byte) 0, record, false, false, (short) 0)); } @@ -502,7 +502,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.appendUpdateRecordTransactional(txID, element, (byte) 0, updateRecord); - tx.records.add(new RecordInfo(element, (byte) 0, updateRecord, true, (short) 0)); + tx.records.add(new RecordInfo(element, (byte) 0, updateRecord, true, false, (short) 0)); } journal.debugWait(); } @@ -515,7 +515,7 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase { journal.appendDeleteRecordTransactional(txID, element); - tx.deletes.add(new RecordInfo(element, (byte) 0, null, true, (short) 0)); + tx.deletes.add(new RecordInfo(element, (byte) 0, null, true, false, (short) 0)); } journal.debugWait(); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java index 18f2b54..7693ce2 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalImplTestUnit.java @@ -2333,7 +2333,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { journal.appendAddRecord(i, (byte) 0, record, false); - records.add(new RecordInfo(i, (byte) 0, record, false, (short) 0)); + records.add(new RecordInfo(i, (byte) 0, record, false, false, (short) 0)); } for (int i = 0; i < 100; i++) { @@ -2341,7 +2341,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase { journal.appendUpdateRecord(i, (byte) 0, record, false); - records.add(new RecordInfo(i, (byte) 0, record, true, (short) 0)); + records.add(new RecordInfo(i, (byte) 0, record, true, false, (short) 0)); } for (int i = 0; i < 100; i++) {