Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 01BCE107E0 for ; Tue, 3 Dec 2013 17:05:21 +0000 (UTC) Received: (qmail 78733 invoked by uid 500); 3 Dec 2013 17:05:20 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 78715 invoked by uid 500); 3 Dec 2013 17:05:20 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78688 invoked by uid 99); 3 Dec 2013 17:05:19 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Dec 2013 17:05:19 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BAC9A91BBF6; Tue, 3 Dec 2013 17:05:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chirino@apache.org To: commits@activemq.apache.org Message-Id: <8252c275f5cc407eaf850c556bedaad3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882 Date: Tue, 3 Dec 2013 17:05:16 +0000 (UTC) Updated Branches: refs/heads/trunk f0334862a -> 5fa462a08 Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5fa462a0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5fa462a0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5fa462a0 Branch: refs/heads/trunk Commit: 5fa462a08acd40b130fb98ad359a838def690450 Parents: f033486 Author: Hiram Chirino Authored: Tue Dec 3 12:04:40 2013 -0500 Committer: Hiram Chirino Committed: Tue Dec 3 12:05:16 2013 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/leveldb/DBManager.scala | 2 +- .../apache/activemq/leveldb/LevelDBClient.scala | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 00260d9..6b575ee 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) { def createTransactionContainer(id:XATransactionId) = createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE) - def removeTransactionContainer(key:Long) = { // writeExecutor.sync { + def removeTransactionContainer(key:Long) = writeExecutor.sync { client.removeCollection(key) } http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index c0cedce..fe29012 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException */ object LevelDBClient extends Log { + class WriteThread(r:Runnable) extends Thread(r) { + setDaemon(true) + } + final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:" final val STORE_SCHEMA_VERSION = 1 @@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) { } def storeTrace(ascii:String, force:Boolean=false) = { + assert_write_thread_executing val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date) log.appender { appender => appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii))) @@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) { replay_write_batch = null; } + def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread]) + def init() ={ // Lets check store compatibility... @@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) { version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION) writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { - def newThread(r: Runnable) = { - val rc = new Thread(r, "LevelDB store io write") - rc.setDaemon(true) - rc - } + def newThread(r: Runnable) = new WriteThread(r) }) val factoryNames = store.indexFactory @@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) { } def addCollection(record: CollectionRecord.Buffer) = { + assert_write_thread_executing + val key = encodeLongKey(COLLECTION_PREFIX, record.getKey) val value = record.toUnframedBuffer might_fail_using_index { @@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) { } def removeCollection(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) { } def collectionEmpty(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) { val max_index_write_latency = TimeMetric() def store(uows: Array[DelayableUOW]) { + assert_write_thread_executing might_fail_using_index { log.appender { appender => val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>