activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [01/44] activemq-artemis git commit: Use SeqId in JDBC Records vs timestamp [Forced Update!]
Date Thu, 04 Feb 2016 22:05:03 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/refactor-openwire cc0d8d8dc -> ccca06e3c (forced update)


Use SeqId in JDBC Records vs timestamp


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0533a5f5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0533a5f5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0533a5f5

Branch: refs/heads/refactor-openwire
Commit: 0533a5f5fe07bccec9f03960bfe3b4310cb849ce
Parents: af89b93
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Thu Feb 4 16:22:23 2016 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Thu Feb 4 16:24:47 2016 +0000

----------------------------------------------------------------------
 .../jdbc/store/journal/JDBCJournalImpl.java     | 53 +++++++++++---------
 .../jdbc/store/journal/JDBCJournalRecord.java   | 22 +++++---
 2 files changed, 45 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0533a5f5/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 55e9f89..cc7462b 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -84,6 +85,9 @@ public class JDBCJournalImpl implements Journal {
    // Track Tx Records
    private Map<Long, TransactionHolder> transactions = new ConcurrentHashMap<>();
 
+   // Sequence ID for journal records
+   private AtomicLong seq = new AtomicLong(0);
+
    public JDBCJournalImpl(String jdbcUrl, String tableName) {
       this.tableName = tableName;
       this.jdbcUrl = jdbcUrl;
@@ -347,7 +351,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -356,7 +360,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -369,7 +373,7 @@ public class JDBCJournalImpl implements Journal {
                                EncodingSupport record,
                                boolean sync,
                                IOCompletion completionCallback) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -379,7 +383,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -388,7 +392,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -401,7 +405,7 @@ public class JDBCJournalImpl implements Journal {
                                   EncodingSupport record,
                                   boolean sync,
                                   IOCompletion completionCallback) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setSync(sync);
@@ -411,14 +415,14 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendDeleteRecord(long id, boolean sync) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
       r.setSync(sync);
       appendRecord(r);
    }
 
    @Override
    public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD, seq.incrementAndGet());
       r.setSync(sync);
       r.setIoCompletion(completionCallback);
       appendRecord(r);
@@ -426,7 +430,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record)
throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
@@ -438,7 +442,7 @@ public class JDBCJournalImpl implements Journal {
                                             long id,
                                             byte recordType,
                                             EncodingSupport record) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.ADD_RECORD_TX, seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
@@ -447,7 +451,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[]
record) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX,
seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
@@ -459,7 +463,7 @@ public class JDBCJournalImpl implements Journal {
                                                long id,
                                                byte recordType,
                                                EncodingSupport record) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.UPDATE_RECORD_TX,
seq.incrementAndGet());
       r.setUserRecordType(recordType);
       r.setRecord(record);
       r.setTxId(txID);
@@ -468,7 +472,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws
Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX,
seq.incrementAndGet());
       r.setRecord(record);
       r.setTxId(txID);
       appendRecord(r);
@@ -476,7 +480,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record)
throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX,
seq.incrementAndGet());
       r.setRecord(record);
       r.setTxId(txID);
       appendRecord(r);
@@ -484,21 +488,21 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendDeleteRecordTransactional(long txID, long id) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX);
+      JDBCJournalRecord r = new JDBCJournalRecord(id, JDBCJournalRecord.DELETE_RECORD_TX,
seq.incrementAndGet());
       r.setTxId(txID);
       appendRecord(r);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       appendRecord(r);
    }
 
    @Override
    public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setIoCompletion(callback);
       appendRecord(r);
@@ -509,7 +513,7 @@ public class JDBCJournalImpl implements Journal {
                                   boolean sync,
                                   IOCompletion callback,
                                   boolean lineUpContext) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.COMMIT_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setStoreLineUp(lineUpContext);
       r.setIoCompletion(callback);
@@ -518,7 +522,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync)
throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(-1, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setSync(sync);
@@ -530,7 +534,7 @@ public class JDBCJournalImpl implements Journal {
                                    EncodingSupport transactionData,
                                    boolean sync,
                                    IOCompletion callback) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setTxData(transactionData);
@@ -541,7 +545,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws
Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.PREPARE_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setTxData(transactionData);
       r.setSync(sync);
@@ -550,7 +554,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync) throws Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
       appendRecord(r);
@@ -558,7 +562,7 @@ public class JDBCJournalImpl implements Journal {
 
    @Override
    public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws
Exception {
-      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD);
+      JDBCJournalRecord r = new JDBCJournalRecord(0, JDBCJournalRecord.ROLLBACK_RECORD, seq.incrementAndGet());
       r.setTxId(txID);
       r.setSync(sync);
       r.setIoCompletion(callback);
@@ -607,6 +611,9 @@ public class JDBCJournalImpl implements Journal {
                   throw new Exception("Error Reading Journal, Unknown Record Type: " + r.getRecordType());
             }
             noRecords++;
+            if (r.getSeq() > seq.longValue()) {
+               seq.set(r.getSeq());
+            }
          }
          jrc.checkPreparedTx();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0533a5f5/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
index 1948e5a..2d31a8f 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalRecord.java
@@ -88,7 +88,9 @@ public class JDBCJournalRecord {
 
    private boolean isTransactional;
 
-   public JDBCJournalRecord(long id, byte recordType) {
+   private long seq;
+
+   public JDBCJournalRecord(long id, byte recordType, long seq) {
       this.id = id;
       this.recordType = recordType;
 
@@ -104,20 +106,22 @@ public class JDBCJournalRecord {
       txDataSize = 0;
       txData = new ByteArrayInputStream(new byte[0]);
       txCheckNoRecords = 0;
+
+      this.seq = seq;
    }
 
    public static String createTableSQL(String tableName) {
-      return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId
BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData
BLOB,txCheckNoRecords INTEGER,timestamp BIGINT)";
+      return "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount SMALLINT,txId
BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BLOB,txDataSize INTEGER,txData
BLOB,txCheckNoRecords INTEGER,seq BIGINT)";
    }
 
    public static String insertRecordsSQL(String tableName) {
-      return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,timestamp)
"
+      return "INSERT INTO " + tableName + "(id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq)
"
          + "VALUES (?,?,?,?,?,?,?,?,?,?,?)";
    }
 
    public static String selectRecordsSQL(String tableName) {
-      return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords
"
-         + "FROM " + tableName;
+      return "SELECT id,recordType,compactCount,txId,userRecordType,variableSize,record,txDataSize,txData,txCheckNoRecords,seq
"
+         + "FROM " + tableName + " ORDER BY seq ASC";
    }
 
    public static String deleteRecordsSQL(String tableName) {
@@ -168,7 +172,7 @@ public class JDBCJournalRecord {
       statement.setInt(8, txDataSize);
       statement.setBytes(9, txDataBytes);
       statement.setInt(10, txCheckNoRecords);
-      statement.setLong(11, System.currentTimeMillis());
+      statement.setLong(11, seq);
       statement.addBatch();
    }
 
@@ -178,7 +182,7 @@ public class JDBCJournalRecord {
    }
 
    public static JDBCJournalRecord readRecord(ResultSet rs) throws SQLException {
-      JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2));
+      JDBCJournalRecord record = new JDBCJournalRecord(rs.getLong(1), (byte) rs.getShort(2),
rs.getLong(11));
       record.setCompactCount((byte) rs.getShort(3));
       record.setTxId(rs.getLong(4));
       record.setUserRecordType((byte) rs.getShort(5));
@@ -339,4 +343,8 @@ public class JDBCJournalRecord {
    public boolean isTransactional() {
       return isTransactional;
    }
+
+   public long getSeq() {
+      return seq;
+   }
 }
\ No newline at end of file


Mime
View raw message