activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1515799 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/
Date Tue, 20 Aug 2013 12:38:33 GMT
Author: chirino
Date: Tue Aug 20 12:38:32 2013
New Revision: 1515799

URL: http://svn.apache.org/r1515799
Log:
Persist the latest producer position across restarts in the leveldb store.

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java?rev=1515799&r1=1515798&r2=1515799&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAuditNoSync.java
Tue Aug 20 12:38:32 2013
@@ -168,7 +168,7 @@ public class ActiveMQMessageAuditNoSync 
                 BitArrayBin bab = map.get(pid);
                 if (bab == null) {
                     bab = new BitArrayBin(auditDepth);
-                    map.put(pid, bab);
+                    map.put(pid.toString(), bab);
                     modified = true;
                 }
                 answer = bab.setBit(id.getProducerSequenceId(), true);
@@ -272,7 +272,7 @@ public class ActiveMQMessageAuditNoSync 
                 BitArrayBin bab = map.get(pid);
                 if (bab == null) {
                     bab = new BitArrayBin(auditDepth);
-                    map.put(pid, bab);
+                    map.put(pid.toString(), bab);
                     modified = true;
                 }
                 answer = bab.isInOrder(id.getProducerSequenceId());

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1515799&r1=1515798&r2=1515799&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Tue Aug 20 12:38:32 2013
@@ -427,7 +427,7 @@ class DBManager(val parent:LevelDBStore)
 
   val lastUowId = new AtomicInteger(1)
 
-  val producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
+  var producerSequenceIdTracker = new ActiveMQMessageAuditNoSync
 
   def getLastProducerSequenceId(id: ProducerId): Long = dispatchQueue.sync {
     producerSequenceIdTracker.getLastSeqId(id)
@@ -437,13 +437,6 @@ class DBManager(val parent:LevelDBStore)
     dispatchQueue.assertExecuting()
     uowClosedCounter += 1
 
-    // track the producer seq positions.
-    for( (_, action) <- uow.actions ) {
-      if( action.messageRecord!=null ) {
-        producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
-      }
-    }
-
     // Broker could issue a flush_message call before
     // this stage runs.. which make the stage jump over UowDelayed
     if( uow.state.stage < UowDelayed.stage ) {
@@ -452,10 +445,6 @@ class DBManager(val parent:LevelDBStore)
     if( uow.state.stage < UowFlushing.stage ) {
       uow.actions.foreach { case (id, action) =>
 
-        if( action.messageRecord!=null ) {
-          producerSequenceIdTracker.isDuplicate(action.messageRecord.id)
-        }
-
         // The UoW may have been canceled.
         if( action.messageRecord!=null && action.enqueues.isEmpty ) {
           action.removeFromPendingStore() 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1515799&r1=1515798&r2=1515799&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Tue Aug 20 12:38:32 2013
@@ -40,6 +40,7 @@ import java.util.{Date, Collections}
 import org.apache.activemq.leveldb.util.TimeMetric
 import org.apache.activemq.leveldb.RecordLog.LogInfo
 import org.fusesource.leveldbjni.internal.JniDB
+import org.apache.activemq.ActiveMQMessageAuditNoSync
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -66,6 +67,7 @@ object LevelDBClient extends Log {
   final val DIRTY_INDEX_KEY = bytes(":dirty")
   final val LOG_REF_INDEX_KEY = bytes(":log-refs")
   final val LOGS_INDEX_KEY = bytes(":logs")
+  final val PRODUCER_IDS_INDEX_KEY = bytes(":producer_ids")
 
   final val COLLECTION_META_KEY = bytes(":collection-meta")
   final val TRUE = bytes("true")
@@ -699,6 +701,10 @@ class LevelDBClient(store: LevelDBStore)
             log.read(pos).map {
               case (kind, data, nextPos) =>
                 kind match {
+                  case LOG_DATA =>
+                    val message = decodeMessage(data)
+                    store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId)
+
                   case LOG_ADD_COLLECTION =>
                     val record= decodeCollectionRecord(data)
                     index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data)
@@ -846,10 +852,19 @@ class LevelDBClient(store: LevelDBStore)
         case e => throw e
       }
     }
+    def storeObject(key:Array[Byte], o:Object) = {
+      val baos = new ByteArrayOutputStream()
+      val os = new ObjectOutputStream(baos);
+      os.writeObject(o)
+      os.close()
+      index.put(key, baos.toByteArray)
+    }
 
     storeMap(LOG_REF_INDEX_KEY, logRefs)
     storeMap(COLLECTION_META_KEY, collectionMeta)
     storeList(LOGS_INDEX_KEY, log.log_file_positions)
+    storeObject(PRODUCER_IDS_INDEX_KEY, store.db.producerSequenceIdTracker)
+
   }
 
   private def loadCounters = {
@@ -878,6 +893,13 @@ class LevelDBClient(store: LevelDBStore)
         rc
       }
     }
+    def loadObject(key:Array[Byte]) = {
+      index.get(key, new ReadOptions).map { value=>
+        val bais = new ByteArrayInputStream(value)
+        val is = new ObjectInputStream(bais);
+        is.readObject();
+      }
+    }
 
     loadMap(LOG_REF_INDEX_KEY, logRefs)
     loadMap(COLLECTION_META_KEY, collectionMeta)
@@ -887,6 +909,9 @@ class LevelDBClient(store: LevelDBStore)
         recoveryLogs.put(k, null)
       }
     }
+    for( audit <- loadObject(PRODUCER_IDS_INDEX_KEY) ) {
+      store.db.producerSequenceIdTracker = audit.asInstanceOf[ActiveMQMessageAuditNoSync]
+    }
   }
 
   var wal_append_position = 0L
@@ -1183,16 +1208,17 @@ class LevelDBClient(store: LevelDBStore)
     }
 
     // Lets decode
-    buffer.map{ x =>
-      var data = if( store.snappyCompressLogs ) {
-        Snappy.uncompress(x)
-      } else {
-        x
-      }
-      store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
-    }.getOrElse(null)
+    buffer.map(decodeMessage(_)).getOrElse(null)
   }
 
+  def decodeMessage(x: Buffer): Message = {
+    var data = if (store.snappyCompressLogs) {
+      Snappy.uncompress(x)
+    } else {
+      x
+    }
+    store.wireFormat.unmarshal(new ByteSequence(data.data, data.offset, data.length)).asInstanceOf[Message]
+  }
 
   def collectionCursor(collectionKey: Long, cursorPosition:Buffer)(func: (Buffer, EntryRecord.Buffer)=>Boolean)
= {
     val ro = new ReadOptions
@@ -1267,6 +1293,7 @@ class LevelDBClient(store: LevelDBStore)
         var dataLocator: DataLocator = null
 
         if (messageRecord != null && messageRecord.locator == null) {
+          store.db.producerSequenceIdTracker.isDuplicate(messageRecord.id)
           val start = System.nanoTime()
           val p = appender.append(LOG_DATA, messageRecord.data)
           log_info = p._2
@@ -1329,7 +1356,6 @@ class LevelDBClient(store: LevelDBStore)
           val index_record = new EntryRecord.Bean()
           index_record.setValueLocation(dataLocator.pos)
           index_record.setValueLength(dataLocator.len)
-          batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
 
           val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
           batch.put(key, index_data)



Mime
View raw message