activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6164 - allow journal write batching on a single destination
Date Tue, 09 Feb 2016 12:51:22 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 90726a60a -> 499e39e52


https://issues.apache.org/jira/browse/AMQ-6164 - allow journal write batching on a single
destination


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/499e39e5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/499e39e5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/499e39e5

Branch: refs/heads/master
Commit: 499e39e52c392fcc6d897a526afbbd3b144121e2
Parents: 90726a6
Author: gtully <gary.tully@gmail.com>
Authored: Tue Feb 9 12:49:53 2016 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Feb 9 12:50:27 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 45 +++++++++-----------
 .../store/memory/MemoryMessageStore.java        | 10 ++---
 .../activemq/store/kahadb/KahaDBStore.java      | 13 +++---
 .../apache/activemq/leveldb/LevelDBStore.scala  |  5 +--
 4 files changed, 34 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 960ac9c..34817a0 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -829,33 +829,28 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
         producerExchange.incrementSend();
         do {
             checkUsage(context, producerExchange, message);
-            sendLock.lockInterruptibly();
-            try {
-                message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                if (store != null && message.isPersistent()) {
-                    message.getMessageId().setFutureOrSequenceLong(null);
-                    try {
-                        if (messages.isCacheEnabled()) {
-                            result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
-                            result.addListener(new PendingMarshalUsageTracker(message));
-                        } else {
-                            store.addMessage(context, message);
-                        }
-                        if (isReduceMemoryFootprint()) {
-                            message.clearMarshalledState();
-                        }
-                    } catch (Exception e) {
-                        // we may have a store in inconsistent state, so reset the cursor
-                        // before restarting normal broker operations
-                        resetNeeded = true;
-                        throw e;
+            message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+            if (store != null && message.isPersistent()) {
+                message.getMessageId().setFutureOrSequenceLong(null);
+                try {
+                    if (messages.isCacheEnabled()) {
+                        result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
+                        result.addListener(new PendingMarshalUsageTracker(message));
+                    } else {
+                        store.addMessage(context, message);
                     }
+                    if (isReduceMemoryFootprint()) {
+                        message.clearMarshalledState();
+                    }
+                } catch (Exception e) {
+                    // we may have a store in inconsistent state, so reset the cursor
+                    // before restarting normal broker operations
+                    resetNeeded = true;
+                    throw e;
                 }
-                if(tryOrderedCursorAdd(message, context)) {
-                    break;
-                }
-            } finally {
-                sendLock.unlock();
+            }
+            if(tryOrderedCursorAdd(message, context)) {
+                break;
             }
         } while (started.get());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index b32a811..736d912 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -59,11 +59,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
         synchronized (messageTable) {
             messageTable.put(message.getMessageId(), message);
             incMessageStoreStatistics(getMessageStoreStatistics(), message);
-        }
-        message.incrementReferenceCount();
-        message.getMessageId().setFutureOrSequenceLong(sequenceId++);
-        if (indexListener != null) {
-            indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
+            message.incrementReferenceCount();
+            message.getMessageId().setFutureOrSequenceLong(sequenceId++);
+            if (indexListener != null) {
+                indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index fa4672b..e1c1df4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -326,10 +326,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
         return task;
     }
 
+    // with asyncTaskMap locked
     protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException
{
-        synchronized (store.asyncTaskMap) {
-            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()),
task);
-        }
+        store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()),
task);
         this.queueExecutor.execute(task);
     }
 
@@ -390,9 +389,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
                 message.getMessageId().setFutureOrSequenceLong(future);
                 message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
                 result.aquireLocks();
-                addQueueTask(this, result);
-                if (indexListener != null) {
-                    indexListener.onAdd(new IndexListener.MessageContext(context, message,
null));
+                synchronized (asyncTaskMap) {
+                    addQueueTask(this, result);
+                    if (indexListener != null) {
+                        indexListener.onAdd(new IndexListener.MessageContext(context, message,
null));
+                    }
                 }
                 return future;
             } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/499e39e5/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index a4cdcac..f80e722 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -758,7 +758,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
       uow.addCompleteListener({
         message.decrementReferenceCount()
       })
-      val sequence = lastSeq.synchronized {
+      lastSeq.synchronized {
         val seq = lastSeq.incrementAndGet()
         message.getMessageId.setFutureOrSequenceLong(seq);
         // null context on xa recovery, we want to bypass the cursor & pending adds as
it will be reset
@@ -768,9 +768,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
             def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq)
}
           }))
         }
-        seq
+        uow.enqueue(key, seq, message, delay)
       }
-      uow.enqueue(key, sequence, message, delay)
     }
 
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context,
message, false)


Mime
View raw message