activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r760433 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/store/ main/java/org/apache/activemq/broker/store/kahadb/ main/java/org/apache/activemq/broker/store/memor...
Date Tue, 31 Mar 2009 13:20:16 GMT
Author: chirino
Date: Tue Mar 31 13:20:07 2009
New Revision: 760433

URL: http://svn.apache.org/viewvc?rev=760433&view=rev
Log:
Applied Colin's patch at https://issues.apache.org/activemq/browse/AMQ-2189
Thanks!

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
Tue Mar 31 13:20:07 2009
@@ -444,22 +444,33 @@
                     // Add to the pending list if persistent and we are durable:
                     if (isDurable() && message.isPersistent()) {
                         synchronized (queue) {
-                            pendingMessages.put(msg.getMessageId(), message);
+                            Object old = pendingMessages.put(msg.getMessageId(), message);
+                            if(old != null)
+                            {
+                                new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
+                            }
                             pendingMessageIds.add(msg.getMessageId());
+                            connection.write(md);
                         }
                     }
-                    connection.write(md);
+                    else
+                    {
+                        connection.write(md);
+                    }
                 };
             });
         }
-
         public void ack(MessageAck info) {
+            //TODO: The pending message queue could probably be optimized to avoid having
+            //to create a new list here. 
+            LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
           
             synchronized (queue) {
                 if (isDurable()) {
                     MessageId id = info.getLastMessageId();
                     while (!pendingMessageIds.isEmpty()) {
                         MessageId pendingId = pendingMessageIds.getFirst();
                         MessageDelivery delivery = pendingMessages.remove(pendingId);
+                        acked.add(delivery);
                         delivery.delete(durableQueueName);
                         pendingMessageIds.removeFirst();
                         if (pendingId.equals(id)) {
@@ -470,6 +481,13 @@
                 }
                 limiter.onProtocolCredit(info.getMessageCount());
             }
+            
+            //Delete outside of synchronization on queue to avoid contention with enqueueing
+            //threads. 
+            for(MessageDelivery delivery : acked)
+            {
+                delivery.delete(durableQueueName);
+            }
         }
 
         public IFlowSink<MessageDelivery> getSink() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
Tue Mar 31 13:20:07 2009
@@ -23,7 +23,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +46,8 @@
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
 
 public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.Operation>
{
 
@@ -58,11 +59,12 @@
 
     private final IDispatcher dispatcher;
     private Thread flushThread;
-    private final ConcurrentLinkedQueue<OperationBase> opQueue;
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
 
     private HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String,
ProtocolHandler>();
+
+    private final LinkedNodeList<OperationBase> opQueue;
     private AtomicBoolean notify = new AtomicBoolean(false);
     private Semaphore opsReady = new Semaphore(0);
     private long opSequenceNumber;
@@ -72,7 +74,7 @@
     // num scheduled for delay
     private long delayedFlushPointer = 0; // The last delayable sequence num
     // requested.
-    private final long FLUSH_DELAY_MS = 5;
+    private final long FLUSH_DELAY_MS = 100;
     private final Runnable flushDelayCallback;
 
     public interface DatabaseListener {
@@ -103,7 +105,7 @@
     public BrokerDatabase(Store store, IDispatcher dispatcher) {
         this.store = store;
         this.dispatcher = dispatcher;
-        this.opQueue = new ConcurrentLinkedQueue<OperationBase>();
+        this.opQueue = new LinkedNodeList<OperationBase>();
         storeLimiter = new SizeLimiter<OperationBase>(5000, 0) {
 
             @Override
@@ -204,26 +206,27 @@
         return op;
     }
 
-    private synchronized final void addToOpQueue(OperationBase op) {
+    private final void addToOpQueue(OperationBase op) {
         if (!running.get()) {
             throw new IllegalStateException("BrokerDatabase not started");
         }
-        op.opSequenceNumber = opSequenceNumber++;
-        opQueue.add(op);
-        if (op.flushRequested) {
-            if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
-                scheduleDelayedFlush(op.opSequenceNumber);
-            } else {
-                updateFlushPointer(op.opSequenceNumber);
+
+        synchronized (opQueue) {
+            op.opSequenceNumber = opSequenceNumber++;
+            opQueue.addLast(op);
+            if (op.flushRequested) {
+                if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
+                    scheduleDelayedFlush(op.opSequenceNumber);
+                } else {
+                    updateFlushPointer(op.opSequenceNumber);
+                }
             }
         }
-        if (notify.get()) {
-            opsReady.release();
-        }
     }
 
     private void updateFlushPointer(long seqNumber) {
         if (seqNumber > flushPointer) {
+            flushPointer = seqNumber;
             if (notify.get()) {
                 opsReady.release();
             }
@@ -246,27 +249,40 @@
 
     }
 
-    private synchronized final void flushDelayCallback() {
-        if (flushPointer < requestedDelayedFlushPointer) {
-            updateFlushPointer(requestedDelayedFlushPointer);
-            requestedDelayedFlushPointer = -1;
-            // Schedule next delay if needed:
-            scheduleDelayedFlush(delayedFlushPointer);
+    private final void flushDelayCallback() {
+        synchronized (opQueue) {
+            if (flushPointer < requestedDelayedFlushPointer) {
+                updateFlushPointer(requestedDelayedFlushPointer);
+                requestedDelayedFlushPointer = -1;
+                // Schedule next delay if needed:
+                if (delayedFlushPointer > flushPointer) {
+                    scheduleDelayedFlush(delayedFlushPointer);
+                } else {
+                    delayedFlushPointer = -1;
+                }
+            }
         }
     }
 
     private final OperationBase getNextOp(boolean wait) {
         if (!wait) {
-            return opQueue.poll();
+            synchronized (opQueue) {
+                OperationBase op = opQueue.getHead();
+                if (op != null && op.opSequenceNumber <= flushPointer) {
+                    op.unlink();
+                    return op;
+                }
+            }
+            return null;
         } else {
-            OperationBase op = opQueue.poll();
+            OperationBase op = getNextOp(false);
             if (op == null) {
                 notify.set(true);
-                op = opQueue.poll();
+                op = getNextOp(false);
                 try {
                     while (running.get() && op == null) {
                         opsReady.acquireUninterruptibly();
-                        op = opQueue.poll();
+                        op = getNextOp(false);
                     }
                 } finally {
                     notify.set(false);
@@ -289,17 +305,44 @@
             if (firstOp == null) {
                 continue;
             }
-            counter.count = 1;
+            counter.count = 0;
 
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
                 final LinkedList<Operation> processedQueue = new LinkedList<Operation>();
                 try {
 
+                    Operation op = firstOp;
                     // TODO the recursion here leads to a rather large stack,
                     // refactor.
-
-                    executeOps(firstOp, processedQueue, counter);
+                    while (op != null) {
+                        final Operation toExec = op;
+                        counter.count++;
+                        //System.out.println("Executing " + op);
+                        store.execute(new Store.VoidCallback<Exception>() {
+                            @Override
+                            public void run(Session session) throws Exception {
+
+                                // Try to execute the operation against the
+                                // session...
+                                try {
+                                    if (toExec.execute(session)) {
+                                        processedQueue.add(toExec);
+                                    } else {
+                                        counter.count--;
+                                    }
+                                } catch (CancellationException ignore) {
+                                    //System.out.println("Cancelled" + toExec);
+                                }
+                            }
+                        }, null);
+                        if (counter.count < 1000) {
+                            op = getNextOp(false);
+                        } else {
+                            op = null;
+                        }
+                    }
+                    // executeOps(firstOp, processedQueue, counter);
 
                     // If we procecessed some ops, flush and post process:
                     if (!processedQueue.isEmpty()) {
@@ -337,35 +380,24 @@
         }
     }
 
-    private final void executeOps(final OperationBase op, final LinkedList<Operation>
processedQueue, final OpCounter counter) throws FatalStoreException, Exception {
-        store.execute(new Store.VoidCallback<Exception>() {
-            @Override
-            public void run(Session session) throws Exception {
-
-                // Try to execute the operation against the
-                // session...
-                try {
-                    if (op.execute(session)) {
-                        processedQueue.add(op);
-                    } else {
-                        counter.count--;
-                    }
-                } catch (CancellationException ignore) {
-                    System.out.println("Cancelled" + op);
-                }
-
-                // See if we can batch up some additional operations
-                // in this transaction.
-                if (counter.count < 100) {
-                    OperationBase next = getNextOp(false);
-                    if (next != null) {
-                        counter.count++;
-                        executeOps(next, processedQueue, counter);
-                    }
-                }
-            }
-        }, null);
-    }
+    /*
+     * private final void executeOps(final OperationBase op, final
+     * LinkedList<Operation> processedQueue, final OpCounter counter) throws
+     * FatalStoreException, Exception { store.execute(new
+     * Store.VoidCallback<Exception>() {
+     * 
+     * @Override public void run(Session session) throws Exception {
+     * 
+     * // Try to execute the operation against the // session... try { if
+     * (op.execute(session)) { processedQueue.add(op); } else { counter.count--;
+     * } } catch (CancellationException ignore) { System.out.println("Cancelled"
+     * + op); }
+     * 
+     * // See if we can batch up some additional operations // in this
+     * transaction. if (counter.count < 100) { OperationBase next =
+     * getNextOp(false); if (next != null) { counter.count++; executeOps(next,
+     * processedQueue, counter); } } } }, null); }
+     */
 
     /**
      * Adds a queue to the database
@@ -546,7 +578,7 @@
      * This is a convenience base class that can be used to implement
      * Operations. It handles operation cancellation for you.
      */
-    private abstract class OperationBase implements Operation {
+    private abstract class OperationBase extends LinkedNode<OperationBase> implements
Operation {
         public boolean flushRequested = false;
         public long opSequenceNumber = -1;
 
@@ -557,9 +589,9 @@
         public boolean cancel() {
             if (executePending.compareAndSet(true, false)) {
                 cancelled.set(true);
-                //System.out.println("Cancelled: " + this);
+                // System.out.println("Cancelled: " + this);
                 synchronized (opQueue) {
-                    opQueue.remove(this);
+                    unlink();
                     storeController.elementDispatched(this);
                 }
                 return true;
@@ -622,6 +654,10 @@
         public void onRollback(Throwable error) {
             error.printStackTrace();
         }
+
+        public String toString() {
+            return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/"
+ cancelled() + "/" + executed();
+        }
     }
 
     private class QueueAddOperation extends OperationBase {
@@ -676,18 +712,18 @@
         public void onCommit() {
 
         }
-        
+
         public String toString() {
             return "QueueDelete: " + queue.toString();
         }
     }
 
     private class DeleteMessageOperation extends OperationBase {
-        private final MessageDelivery delivery;
+        private final long storeTracking;
         private AsciiBuffer queue;
 
         public DeleteMessageOperation(MessageDelivery delivery, AsciiBuffer queue) {
-            this.delivery = delivery;
+            this.storeTracking = delivery.getStoreTracking();
             this.queue = queue;
         }
 
@@ -700,7 +736,7 @@
         @Override
         protected void doExcecute(Session session) {
             try {
-                session.queueRemoveMessage(queue, delivery.getStoreTracking());
+                session.queueRemoveMessage(queue, storeTracking);
             } catch (KeyNotFoundException e) {
                 // TODO Probably doesn't always mean an error, it is possible
                 // that
@@ -714,9 +750,9 @@
         public void onCommit() {
 
         }
-        
+
         public String toString() {
-            return "MessageDelete: " + queue.toString() + delivery.getStoreTracking();
+            return "MessageDelete: " + queue.toString() + " tracking: " + storeTracking;
         }
     }
 
@@ -770,7 +806,7 @@
         public void onCommit() {
             listener.messagesRestored(msgs);
         }
-        
+
         public String toString() {
             return "MessageRestore: " + queue.toString() + " first: " + firstKey + " max:
" + maxRecords;
         }
@@ -782,7 +818,7 @@
 
         private final MessageDelivery delivery;
         private final AsciiBuffer target;
-        private final MessageRecord record;
+        private MessageRecord record;
 
         private final boolean delayable;
 
@@ -790,8 +826,10 @@
             this.brokerDelivery = delivery;
             this.delivery = delivery;
             target = null;
-            this.record = delivery.createMessageRecord();
             this.delayable = delivery.isFlushDelayable();
+            if (!delayable) {
+                this.record = delivery.createMessageRecord();
+            }
         }
 
         public AddMessageOperation(MessageDelivery delivery, AsciiBuffer target) throws IOException
{
@@ -805,12 +843,12 @@
         public boolean isDelayable() {
             return delayable;
         }
-        
+
         @Override
         public int getLimiterSize() {
             return delivery.getFlowLimiterSize();
         }
-        
+
         @Override
         protected void doExcecute(Session session) {
 
@@ -819,6 +857,13 @@
                 Collection<AsciiBuffer> targets = brokerDelivery.getPersistentQueues();
 
                 if (!targets.isEmpty()) {
+                    if (record == null) {
+                        try {
+                            record = delivery.createMessageRecord();
+                        } catch (IOException e) {
+                            throw new FatalStoreException("Error marshalling message", e);
+                        }
+                    }
                     record.setKey(delivery.getStoreTracking());
                     session.messageAdd(record);
 
@@ -840,11 +885,11 @@
                 }
             } else {
 
-                Long key = session.messageAdd(record);
+                session.messageAdd(record);
                 try {
                     QueueRecord queueRecord = new QueueRecord();
                     queueRecord.setAttachment(null);
-                    queueRecord.setMessageKey(key);
+                    queueRecord.setMessageKey(record.getKey());
                     session.queueAddMessage(target, queueRecord);
                 } catch (KeyNotFoundException e) {
                     e.printStackTrace();
@@ -858,7 +903,7 @@
         }
 
         public String toString() {
-            return "AddOperation " + delivery.getStoreTracking() + " seq: " + opSequenceNumber
+ "P/C/E" + super.executePending.get() + "/" + cancelled() + "/" + executed();
+            return "AddOperation " + delivery.getStoreTracking() + super.toString();
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
Tue Mar 31 13:20:07 2009
@@ -276,7 +276,7 @@
      */
     public interface Session {
 
-        public Long messageAdd(MessageRecord message);
+        public void messageAdd(MessageRecord message);
 
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
Tue Mar 31 13:20:07 2009
@@ -459,7 +459,7 @@
                 indexLock.writeLock().unlock();
             }
             long end = System.currentTimeMillis();
-            if (end - start > 100) {
+            if (end - start > 1000) {
                 LOG.warn("KahaDB Cleanup took " + (end - start));
             }
         } catch (IOException e) {
@@ -487,7 +487,9 @@
      */
     private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
 
-        LOG.debug("Checkpoint started.");
+        if (LOG.isErrorEnabled()) {
+            LOG.debug("Checkpoint started.");
+        }
 
         rootEntity.setState(OPEN_STATE);
         rootEntity.store(tx);
@@ -575,12 +577,16 @@
             // }
 
             if (!gcCandidateSet.isEmpty()) {
-                LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
+                if (LOG.isErrorEnabled()) {
+                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
+                }
                 journal.removeDataFiles(gcCandidateSet);
             }
         }
 
-        LOG.debug("Checkpoint done.");
+        if (LOG.isErrorEnabled()) {
+            LOG.debug("Checkpoint done.");
+        }
     }
 
     public HashSet<Integer> getJournalFilesBeingReplicated() {
@@ -632,7 +638,7 @@
         }
 
         long end = System.currentTimeMillis();
-        if (end - start > 100) {
+        if (end - start > 1000) {
             LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) +
" ms, Index Update took " + (end - start2) + " ms");
         }
         return location;
@@ -700,7 +706,7 @@
         }
 
         long end = System.currentTimeMillis();
-        if (end - start > 100) {
+        if (end - start > 1000) {
             LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) +
" ms, Index Update took " + (end - start2) + " ms");
         }
     }
@@ -845,10 +851,12 @@
         // /////////////////////////////////////////////////////////////
         // Message related methods.
         // /////////////////////////////////////////////////////////////
-        public Long messageAdd(MessageRecord message) {
-            Long id = rootEntity.nextMessageKey();
+        public void messageAdd(MessageRecord message) {
+            if (message.getKey() < 0) {
+                throw new IllegalArgumentException("Key not set");
+            }
             MessageAddBean bean = new MessageAddBean();
-            bean.setMessageKey(id);
+            bean.setMessageKey(message.getKey());
             bean.setMessageId(message.getMessageId());
             bean.setEncoding(message.getEncoding());
             Buffer buffer = message.getBuffer();
@@ -860,7 +868,6 @@
                 bean.setStreamKey(streamKey);
             }
             updates.add(bean);
-            return id;
         }
 
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
(original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Tue Mar 31 13:20:07 2009
@@ -177,7 +177,7 @@
         // //////////////////////////////////////////////////////////////////////////////
         // Message related methods.
         // ///////////////////////////////////////////////////////////////////////////////
-        public Long messageAdd(MessageRecord record) {
+        public void messageAdd(MessageRecord record) {
             long key = record.getKey();
             if (key < 0) {
                 throw new IllegalArgumentException("Key not set");
@@ -187,9 +187,6 @@
             if (old != null) {
                 messages.put(key, old);
             }
-
-            // messagesKeys.put(record.getMessageId(), key);
-            return key;
         }
 
         public void messageRemove(Long key) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Tue Mar 31 13:20:07 2009
@@ -1,5 +1,7 @@
 package org.apache.activemq.broker.openwire;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import javax.jms.MessageNotWriteableException;
 
 import org.apache.activemq.command.ActiveMQDestination;
@@ -17,7 +19,7 @@
 public class OpenwireSupport {
     
     static private long idGenerator;
-    static private long msgIdGenerator;
+    static private AtomicLong msgIdGenerator = new AtomicLong(0);
 
     public static ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination
destination, String subscriptionName) throws Exception {
         ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
@@ -63,7 +65,7 @@
         ActiveMQTextMessage message = new ActiveMQTextMessage();
         message.setJMSPriority(priority);
         message.setProducerId(producerInfo.getProducerId());
-        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setMessageId(new MessageId(producerInfo, msgIdGenerator.incrementAndGet()));
         message.setDestination(destination);
         message.setPersistent(false);
         if( payload!=null ) {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Tue Mar 31 13:20:07 2009
@@ -123,9 +123,9 @@
                     store.execute(new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
-                            Long messageKey = session.messageAdd(messageRecord);
+                            session.messageAdd(messageRecord);
                             QueueRecord queueRecord = new Store.QueueRecord();
-                            queueRecord.setMessageKey(messageKey);
+                            queueRecord.setMessageKey(messageRecord.getKey());
                             session.queueAddMessage(queueName, queueRecord);
                         }
                     }, onFlush);

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=760433&r1=760432&r2=760433&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
(original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
Tue Mar 31 13:20:07 2009
@@ -56,17 +56,18 @@
         expected.setBuffer(new Buffer("buffer"));
         expected.setEncoding(new AsciiBuffer("encoding"));
         expected.setMessageId(new AsciiBuffer("1000"));
-
-        final Long messageKey = store.execute(new Callback<Long, Exception>() {
-            public Long execute(Session session) throws Exception {
-                return session.messageAdd(expected);
+        expected.setKey(store.allocateStoreTracking());
+        
+        store.execute(new VoidCallback<Exception>() {
+            public void run(Session session) throws Exception {
+                session.messageAdd(expected);
             }
         }, null);
 
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
-                MessageRecord actual = session.messageGetRecord(messageKey);
+                MessageRecord actual = session.messageGetRecord(expected.getKey());
                 assertEquals(expected, actual);
             }
         }, null);



Mime
View raw message