activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject [02/10] git commit: Avoid NullPointerExceptions that can occur during leveldb replication M/S state transitions.
Date Tue, 11 Mar 2014 23:29:28 GMT
Avoid NullPointerExceptions that can occur during leveldb replication M/S state transitions.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c585f54f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c585f54f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c585f54f

Branch: refs/heads/activemq-5.9
Commit: c585f54fa3fdd02e028674bf4b0e59e9ae0d9bab
Parents: 4dd1cba
Author: Hiram Chirino <hiram@hiramchirino.com>
Authored: Tue Oct 15 11:45:45 2013 -0400
Committer: Hadrian Zbarcea <hadrian@apache.org>
Committed: Tue Mar 11 17:04:55 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBClient.scala |  4 +--
 .../apache/activemq/leveldb/LevelDBStore.scala  | 30 ++++++++++++++++++++
 .../leveldb/replicated/MasterLevelDBStore.scala |  3 +-
 .../leveldb/replicated/SlaveLevelDBStore.scala  | 16 +++++++----
 .../leveldb/replicated/TransportHandler.scala   |  5 +++-
 5 files changed, 48 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c585f54f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index a8d2e4f..1c6adec 100755
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -929,7 +929,7 @@ class LevelDBClient(store: LevelDBStore) {
 
   var wal_append_position = 0L
 
-  def stop() = {
+  def stop() = this.synchronized {
     if( writeExecutor!=null ) {
       writeExecutor.shutdown
       writeExecutor.awaitTermination(60, TimeUnit.SECONDS)
@@ -945,7 +945,7 @@ class LevelDBClient(store: LevelDBStore) {
           index.close
           index = null
         }
-        if (log.isOpen) {
+        if (log!=null && log.isOpen) {
           log.close
           copyDirtyIndexToSnapshot
           wal_append_position = log.appender_limit

http://git-wip-us.apache.org/repos/asf/activemq/blob/c585f54f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index e1efa4d..98aaf6d 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -184,6 +184,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
   val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
 
+  def check_running = {
+    if( this.isStopped ) {
+      throw new IOException("Store has been stopped")
+    }
+  }
+
   def init() = {}
 
   def createDefaultLocker() = {
@@ -664,6 +670,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     lastSeq.set(db.getLastQueueEntrySeq(key))
 
     def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef]
= {
+      check_running
       val seq = lastSeq.incrementAndGet()
       message.incrementReferenceCount()
       uow.addCompleteListener({
@@ -674,6 +681,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context,
message, false)
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay:
Boolean): Future[AnyRef] = {
+      check_running
       message.getMessageId.setEntryLocator(null)
       if(  message.getTransactionId!=null ) {
         transaction(message.getTransactionId).add(this, message, delay)
@@ -687,6 +695,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
 
     override def addMessage(context: ConnectionContext, message: Message) = addMessage(context,
message, false)
     override def addMessage(context: ConnectionContext, message: Message, delay: Boolean):
Unit = {
+      check_running
       waitOn(asyncAddQueueMessage(context, message, delay))
     }
 
@@ -695,6 +704,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit =
{
+      check_running
       if(  ack.getTransactionId!=null ) {
         transaction(ack.getTransactionId).remove(this, ack)
       } else {
@@ -705,10 +715,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def removeMessage(context: ConnectionContext, ack: MessageAck): Unit = {
+      check_running
       removeAsyncMessage(context, ack)
     }
 
     def getMessage(id: MessageId): Message = {
+      check_running
       var message: Message = db.getMessage(id)
       if (message == null) {
         throw new IOException("Message id not found: " + id)
@@ -717,6 +729,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def removeAllMessages(context: ConnectionContext): Unit = {
+      check_running
       db.collectionEmpty(key)
       cursorPosition = 0
     }
@@ -730,6 +743,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def recover(listener: MessageRecoveryListener): Unit = {
+      check_running
       cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0)
     }
 
@@ -738,6 +752,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit =
{
+      check_running
       cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned)
     }
 
@@ -802,6 +817,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = {
+      check_running
       var sub = db.addSubscription(key, info)
       subscriptions.synchronized {
         subscriptions.put((info.getClientId, info.getSubcriptionName), sub)
@@ -815,14 +831,17 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized {
+      check_running
       subscriptions.values.map(_.info).toArray
     }
 
     def lookupSubscription(clientId: String, subscriptionName: String): SubscriptionInfo
= subscriptions.synchronized {
+      check_running
       subscriptions.get((clientId, subscriptionName)).map(_.info).getOrElse(null)
     }
 
     def deleteSubscription(clientId: String, subscriptionName: String): Unit = {
+      check_running
       subscriptions.synchronized {
         subscriptions.remove((clientId, subscriptionName))
       }.foreach(db.removeSubscription(_))
@@ -839,6 +858,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String,
messageId: MessageId, ack: MessageAck): Unit = {
+      check_running
       lookup(clientId, subscriptionName).foreach { sub =>
         var position = db.queuePosition(messageId)
         if(  ack.getTransactionId!=null ) {
@@ -855,23 +875,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def resetBatching(clientId: String, subscriptionName: String): Unit = {
+      check_running
       lookup(clientId, subscriptionName).foreach { sub =>
         sub.cursorPosition = 0
       }
     }
     def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener):
Unit = {
+      check_running
       lookup(clientId, subscriptionName).foreach { sub =>
         sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1))
       }
     }
 
     def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int,
listener: MessageRecoveryListener): Unit = {
+      check_running
       lookup(clientId, subscriptionName).foreach { sub =>
         sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1),
maxReturned)
       }
     }
 
     def getMessageCount(clientId: String, subscriptionName: String): Int = {
+      check_running
       lookup(clientId, subscriptionName) match {
         case Some(sub) =>
           (lastSeq.get - sub.lastAckPosition).toInt
@@ -889,10 +913,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
 
     def getName: String = name
     def destroy() = {
+      check_running
       removePList(name)
     }
 
     def addFirst(id: String, bs: ByteSequence): AnyRef = {
+      check_running
       var pos = lastSeq.decrementAndGet()
       add(pos, id, bs)
       listSize.incrementAndGet()
@@ -900,6 +926,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def addLast(id: String, bs: ByteSequence): AnyRef = {
+      check_running
       var pos = lastSeq.incrementAndGet()
       add(pos, id, bs)
       listSize.incrementAndGet()
@@ -907,6 +934,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def add(pos:Long, id: String, bs: ByteSequence) = {
+      check_running
       val encoded_key = encodeLongLong(key, pos)
       val encoded_id = new UTF8Buffer(id)
       val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length)
@@ -917,6 +945,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     }
 
     def remove(position: AnyRef): Boolean = {
+      check_running
       val pos = position.asInstanceOf[java.lang.Long].longValue()
       val encoded_key = encodeLongLong(key, pos)
       db.plistGet(encoded_key) match {
@@ -933,6 +962,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
     def size(): Long = listSize.get()
 
     def iterator() = new PListIterator() {
+      check_running
       val prefix = LevelDBClient.encodeLong(key)
       var dbi = db.plistIterator
       var last_key:Array[Byte] = _

http://git-wip-us.apache.org/repos/asf/activemq/blob/c585f54f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
index b8289fc..f50e556 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala
@@ -137,7 +137,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
       def onAccept(transport: Transport) {
         transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
         transport.setBlockingExecutor(blocking_executor)
-        new Session(transport)
+        new Session(transport).start
+
       }
       def onAcceptError(error: Exception) {
         warn(error)

http://git-wip-us.apache.org/repos/asf/activemq/blob/c585f54f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
index ce76230..4239a0b 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala
@@ -107,6 +107,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
         session.handler = wal_handler(session)
       }
     })
+    wal_session.start
   }
 
   def stop_connections(cb:Task) = {
@@ -114,18 +115,20 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
       unstash(directory)
       cb.run()
     }
-    if( wal_session !=null ) {
+    val wal_session_copy = wal_session
+    if( wal_session_copy !=null ) {
+      wal_session = null
       val next = then
       then = ^{
-        wal_session.transport.stop(next)
-        wal_session = null
+        wal_session_copy.transport.stop(next)
       }
     }
-    if( transfer_session !=null ) {
+    val transfer_session_copy = transfer_session
+    if( transfer_session_copy !=null ) {
+      transfer_session = null
       val next = then
       then = ^{
-        transfer_session.transport.stop(next)
-        transfer_session = null
+        transfer_session_copy.transport.stop(next)
       }
     }
     then.run();
@@ -414,6 +417,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait
{
         pending_log_removes.clear()
       }
     })
+    transfer_session.start
     state.snapshot_position
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/c585f54f/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
index c5ac309..b13b680 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala
@@ -33,7 +33,10 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe
 
   transport.setProtocolCodec(codec)
   transport.setTransportListener(this)
-  transport.start(NOOP)
+
+  def start = {
+    transport.start(NOOP)
+  }
 
   def onTransportConnected = transport.resumeRead()
   def onTransportDisconnected() = {}


Mime
View raw message