Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 167D710645 for ; Tue, 15 Oct 2013 16:43:13 +0000 (UTC) Received: (qmail 27364 invoked by uid 500); 15 Oct 2013 16:43:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 27278 invoked by uid 500); 15 Oct 2013 16:43:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 27270 invoked by uid 99); 15 Oct 2013 16:43:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Oct 2013 16:43:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B0A648B4B31; Tue, 15 Oct 2013 16:43:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chirino@apache.org To: commits@activemq.apache.org Message-Id: <5e0786d4c46249068384f1391b0691d4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Avoid NullPointerExceptions that can occur during leveldb replication M/S state transitions. Date: Tue, 15 Oct 2013 16:43:08 +0000 (UTC) Updated Branches: refs/heads/trunk ee65ca4ee -> ef45a5f8f Avoid NullPointerExceptions that can occur during leveldb replication M/S state transitions. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ef45a5f8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ef45a5f8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ef45a5f8 Branch: refs/heads/trunk Commit: ef45a5f8ffa0c5c36f1fb98f377ca0cb54b9cbd7 Parents: ee65ca4 Author: Hiram Chirino Authored: Tue Oct 15 11:45:45 2013 -0400 Committer: Hiram Chirino Committed: Tue Oct 15 12:43:00 2013 -0400 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBClient.scala | 4 +-- .../apache/activemq/leveldb/LevelDBStore.scala | 30 ++++++++++++++++++++ .../leveldb/replicated/MasterLevelDBStore.scala | 3 +- .../leveldb/replicated/SlaveLevelDBStore.scala | 16 +++++++---- .../leveldb/replicated/TransportHandler.scala | 5 +++- 5 files changed, 48 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ef45a5f8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index a8d2e4f..1c6adec 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -929,7 +929,7 @@ class LevelDBClient(store: LevelDBStore) { var wal_append_position = 0L - def stop() = { + def stop() = this.synchronized { if( writeExecutor!=null ) { writeExecutor.shutdown writeExecutor.awaitTermination(60, TimeUnit.SECONDS) @@ -945,7 +945,7 @@ class LevelDBClient(store: LevelDBStore) { index.close index = null } - if (log.isOpen) { + if (log!=null && log.isOpen) { log.close copyDirtyIndexToSnapshot wal_append_position = log.appender_limit http://git-wip-us.apache.org/repos/asf/activemq/blob/ef45a5f8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index e1efa4d..98aaf6d 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -184,6 +184,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]() val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]() + def check_running = { + if( this.isStopped ) { + throw new IOException("Store has been stopped") + } + } + def init() = {} def createDefaultLocker() = { @@ -664,6 +670,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P lastSeq.set(db.getLastQueueEntrySeq(key)) def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { + check_running val seq = lastSeq.incrementAndGet() message.incrementReferenceCount() uow.addCompleteListener({ @@ -674,6 +681,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false) override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = { + check_running message.getMessageId.setEntryLocator(null) if( message.getTransactionId!=null ) { transaction(message.getTransactionId).add(this, message, delay) @@ -687,6 +695,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P override def addMessage(context: ConnectionContext, message: Message) = addMessage(context, message, false) override def addMessage(context: ConnectionContext, message: Message, delay: Boolean): Unit = { + check_running waitOn(asyncAddQueueMessage(context, message, delay)) } @@ -695,6 +704,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } override def removeAsyncMessage(context: ConnectionContext, ack: MessageAck): Unit = { + check_running if( ack.getTransactionId!=null ) { transaction(ack.getTransactionId).remove(this, ack) } else { @@ -705,10 +715,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def removeMessage(context: ConnectionContext, ack: MessageAck): Unit = { + check_running removeAsyncMessage(context, ack) } def getMessage(id: MessageId): Message = { + check_running var message: Message = db.getMessage(id) if (message == null) { throw new IOException("Message id not found: " + id) @@ -717,6 +729,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def removeAllMessages(context: ConnectionContext): Unit = { + check_running db.collectionEmpty(key) cursorPosition = 0 } @@ -730,6 +743,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recover(listener: MessageRecoveryListener): Unit = { + check_running cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0) } @@ -738,6 +752,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { + check_running cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorPosition, maxReturned) } @@ -802,6 +817,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def addSubsciption(info: SubscriptionInfo, retroactive: Boolean) = { + check_running var sub = db.addSubscription(key, info) subscriptions.synchronized { subscriptions.put((info.getClientId, info.getSubcriptionName), sub) @@ -815,14 +831,17 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def getAllSubscriptions: Array[SubscriptionInfo] = subscriptions.synchronized { + check_running subscriptions.values.map(_.info).toArray } def lookupSubscription(clientId: String, subscriptionName: String): SubscriptionInfo = subscriptions.synchronized { + check_running subscriptions.get((clientId, subscriptionName)).map(_.info).getOrElse(null) } def deleteSubscription(clientId: String, subscriptionName: String): Unit = { + check_running subscriptions.synchronized { subscriptions.remove((clientId, subscriptionName)) }.foreach(db.removeSubscription(_)) @@ -839,6 +858,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def acknowledge(context: ConnectionContext, clientId: String, subscriptionName: String, messageId: MessageId, ack: MessageAck): Unit = { + check_running lookup(clientId, subscriptionName).foreach { sub => var position = db.queuePosition(messageId) if( ack.getTransactionId!=null ) { @@ -855,23 +875,27 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def resetBatching(clientId: String, subscriptionName: String): Unit = { + check_running lookup(clientId, subscriptionName).foreach { sub => sub.cursorPosition = 0 } } def recoverSubscription(clientId: String, subscriptionName: String, listener: MessageRecoveryListener): Unit = { + check_running lookup(clientId, subscriptionName).foreach { sub => sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1)) } } def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int, listener: MessageRecoveryListener): Unit = { + check_running lookup(clientId, subscriptionName).foreach { sub => sub.cursorPosition = db.cursorMessages(preparedAcks, key, listener, sub.cursorPosition.max(sub.lastAckPosition+1), maxReturned) } } def getMessageCount(clientId: String, subscriptionName: String): Int = { + check_running lookup(clientId, subscriptionName) match { case Some(sub) => (lastSeq.get - sub.lastAckPosition).toInt @@ -889,10 +913,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def getName: String = name def destroy() = { + check_running removePList(name) } def addFirst(id: String, bs: ByteSequence): AnyRef = { + check_running var pos = lastSeq.decrementAndGet() add(pos, id, bs) listSize.incrementAndGet() @@ -900,6 +926,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def addLast(id: String, bs: ByteSequence): AnyRef = { + check_running var pos = lastSeq.incrementAndGet() add(pos, id, bs) listSize.incrementAndGet() @@ -907,6 +934,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def add(pos:Long, id: String, bs: ByteSequence) = { + check_running val encoded_key = encodeLongLong(key, pos) val encoded_id = new UTF8Buffer(id) val os = new DataByteArrayOutputStream(2+encoded_id.length+bs.length) @@ -917,6 +945,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P } def remove(position: AnyRef): Boolean = { + check_running val pos = position.asInstanceOf[java.lang.Long].longValue() val encoded_key = encodeLongLong(key, pos) db.plistGet(encoded_key) match { @@ -933,6 +962,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def size(): Long = listSize.get() def iterator() = new PListIterator() { + check_running val prefix = LevelDBClient.encodeLong(key) var dbi = db.plistIterator var last_key:Array[Byte] = _ http://git-wip-us.apache.org/repos/asf/activemq/blob/ef45a5f8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index b8289fc..f50e556 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -137,7 +137,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { def onAccept(transport: Transport) { transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress)) transport.setBlockingExecutor(blocking_executor) - new Session(transport) + new Session(transport).start + } def onAcceptError(error: Exception) { warn(error) http://git-wip-us.apache.org/repos/asf/activemq/blob/ef45a5f8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index ce76230..4239a0b 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -107,6 +107,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { session.handler = wal_handler(session) } }) + wal_session.start } def stop_connections(cb:Task) = { @@ -114,18 +115,20 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { unstash(directory) cb.run() } - if( wal_session !=null ) { + val wal_session_copy = wal_session + if( wal_session_copy !=null ) { + wal_session = null val next = then then = ^{ - wal_session.transport.stop(next) - wal_session = null + wal_session_copy.transport.stop(next) } } - if( transfer_session !=null ) { + val transfer_session_copy = transfer_session + if( transfer_session_copy !=null ) { + transfer_session = null val next = then then = ^{ - transfer_session.transport.stop(next) - transfer_session = null + transfer_session_copy.transport.stop(next) } } then.run(); @@ -414,6 +417,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { pending_log_removes.clear() } }) + transfer_session.start state.snapshot_position } http://git-wip-us.apache.org/repos/asf/activemq/blob/ef45a5f8/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala index c5ac309..b13b680 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/TransportHandler.scala @@ -33,7 +33,10 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe transport.setProtocolCodec(codec) transport.setTransportListener(this) - transport.start(NOOP) + + def start = { + transport.start(NOOP) + } def onTransportConnected = transport.resumeRead() def onTransportDisconnected() = {}