Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=774764&r1=774763&r2=774764&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original) +++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Thu May 14 13:41:14 2009 @@ -69,11 +69,11 @@ private static final int NO_MATCH = 1; private static final int DECLINED = 2; - private final SortedLinkedList queue = new SortedLinkedList(); + private final SortedLinkedList> queue = new SortedLinkedList>(); private Mapper keyMapper; private final ElementLoader loader; - private Cursor sharedCursor; + private Cursor sharedCursor; private QueueStore store; private PersistencePolicy persistencePolicy; private long nextSequenceNumber = 0; @@ -93,11 +93,9 @@ private final LinkedNodeList trailingConsumers = new LinkedNodeList(); // Limiter/Controller for the size of the queue: - private final FlowController sizeController; + private FlowController inputController; private final IFlowSizeLimiter sizeLimiter; - - // Default cursor memory limit: - private static final long DEFAULT_MEMORY_LIMIT = 10; + private final boolean RELEASE_ON_ACQUISITION = true; private int totalQueueCount; @@ -105,13 +103,13 @@ private boolean started = false; private Mapper expirationMapper; - private final Expirator expirator = new Expirator(); + private Expirator expirator; public SharedQueue(String name, IFlowSizeLimiter limiter) { this(name, limiter, null); } - SharedQueue(String name, IFlowSizeLimiter limiter, Object mutex) { + SharedQueue(String name, IFlowSizeLimiter sizeLimiter, Object mutex) { super(name); this.mutex = mutex == null ? new Object() : mutex; @@ -119,12 +117,7 @@ queueDescriptor = new QueueStore.QueueDescriptor(); queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName())); queueDescriptor.setQueueType(QueueDescriptor.SHARED); - this.sizeLimiter = limiter; - - this.sizeController = new FlowController(getFlowControllableHook(), flow, limiter, this.mutex); - sizeController.useOverFlowQueue(false); - super.onFlowOpened(sizeController); - + this.sizeLimiter = sizeLimiter; loader = new ElementLoader(); } @@ -152,7 +145,11 @@ persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY(); } - sharedCursor = new Cursor(queueDescriptor.getQueueName().toString(), true, true); + inputController = new FlowController(null, flow, sizeLimiter, mutex); + inputController.useOverFlowQueue(false); + super.onFlowOpened(inputController); + + sharedCursor = openCursor(getResourceName(), true, true); // Initialize counts: nextSequenceNumber = sequenceMax + 1; @@ -160,7 +157,7 @@ sizeLimiter.add(count, size); totalQueueCount = count; // Add a paged out placeholder: - QueueElement qe = new QueueElement(null, sequenceMin); + QueueElement qe = new QueueElement(null, sequenceMin, this); qe.loaded = false; queue.add(qe); } @@ -177,6 +174,8 @@ }; } + expirator = new Expirator(); + if (DEBUG) System.out.println(this + "Initialized, first seq: " + sequenceMin + " next sequence: " + nextSequenceNumber); } @@ -199,6 +198,65 @@ } } + private final Cursor openCursor(String name, boolean pageInElements, boolean skipAcquired) { + + FlowController> controller = null; + if (pageInElements && persistencePolicy.isPagingEnabled() && sizeLimiter.getCapacity() > persistencePolicy.getPagingInMemorySize()) { + IFlowSizeLimiter> limiter = new SizeLimiter>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) { + public int getElementSize(QueueElement qe) { + return qe.size; + }; + }; + + controller = new FlowController>(null, flow, limiter, mutex) { + @Override + public IFlowResource getFlowResource() { + return SharedQueue.this; + } + }; + controller.useOverFlowQueue(false); + controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1)); + } + + return new Cursor(queue, loader, name, skipAcquired, pageInElements, controller); + } + + final int getElementSize(V elem) { + return sizeLimiter.getElementSize(elem); + } + + final long getElementExpiration(V elem) { + return expirationMapper.map(elem); + } + + final Expirator getExpirator() { + return expirator; + } + + final QueueStore getQueueStore() { + return store; + } + + final ElementLoader getLoader() { + return loader; + } + + final PersistencePolicy getPersistencePolicy() { + return persistencePolicy; + } + + final void acknowledge(QueueElement qe) { + synchronized (mutex) { + V elem = qe.getElement(); + if (qe.delete()) { + if (!qe.acquired || !RELEASE_ON_ACQUISITION) { + inputController.elementDispatched(elem); + } + totalQueueCount--; + } + } + } + /** * Starts this queue. */ @@ -211,6 +269,7 @@ if (!started) { started = true; sharedCursor.activate(); + loader.start(); expirator.start(); if (isDispatchReady()) { notifyReady(); @@ -232,42 +291,102 @@ stop(); } - public void flowElemAccepted(ISourceController source, V elem) { + public void add(V elem, ISourceController source) { + synchronized (mutex) { + inputController.add(elem, source); + accepted(source, elem); + } + } + public boolean offer(V elem, ISourceController source) { synchronized (mutex) { - if (!initialized) { - throw new IllegalStateException("Not able to use uninitialized queue: " + getResourceName()); + if (inputController.offer(elem, source)) { + accepted(source, elem); + return true; } + return false; + } + } - // Create a new queue element with the next sequence number: - QueueElement qe = new QueueElement(elem, nextSequenceNumber++); + public void flowElemAccepted(ISourceController source, V elem) { + synchronized (mutex) { + // TODO should change flow controller to pass original source: + accepted(null, elem); + } + } - // Save the element (note that it is important this be done after - // we've set the sequence number above) - if (persistencePolicy.isPersistent(elem)) { - // For now base decision on whether to delay flush on - // whether or not there are - // consumers ready. - boolean delayable = !sharedConsumers.isEmpty(); - qe.save(source, delayable); - } - - // Add it to our queue: - queue.add(qe); - totalQueueCount++; - // Check with the loader to see if it needs to be paged out: - loader.elementAdded(qe, source); - expirator.elementAdded(qe); - - // Request dispatch for the newly enqueued element. - // TODO consider optimizing to do direct dispatch? - // It might be better if the dispatcher itself provided - // this for cases where the caller is on the same dispatcher - if (isDispatchReady()) { - notifyReady(); + private final void accepted(ISourceController source, V elem) { + + if (!initialized) { + throw new IllegalStateException("Uninitialized queue: " + getResourceName()); + } + + // Create a new queue element with the next sequence number: + QueueElement qe = new QueueElement(elem, nextSequenceNumber++, this); + + // Save the element (note that it is important this be done after + // we've set the sequence number above) + if (persistencePolicy.isPersistent(elem)) { + // For now base decision on whether to delay flush on + // whether or not there are + // consumers ready. + boolean delayable = !sharedConsumers.isEmpty(); + qe.save(source, delayable); + } + + // Add it to our queue: + queue.add(qe); + totalQueueCount++; + if (!persistencePolicy.isPagingEnabled()) { + qe.addHardRef(); + } + // Check with the shared cursor to see if it is willing to + // absorb the element. If so that's good enough. + if (persistencePolicy.isPagingEnabled() && !sharedCursor.offer(qe, source)) { + + // Otherwise check with any other open cursor to see if + // it can take the element: + Collection> active = loader.getActiveCursors(qe); + + // If there are none, unload the element: + if (active == null) { + qe.unload(source); + return; + } + + // See if a cursor is willing to hang on to the + // element: + boolean accepted = false; + for (Cursor cursor : active) { + // Already checked the shared cursor above: + if (cursor == sharedCursor) { + continue; + } + + if (cursor.offer(qe, source)) { + accepted = true; + break; + } + } + + // If no cursor accepted it, then page out the element: + // keeping the element loaded. + if (!accepted) { + qe.unload(source); } } + + expirator.elementAdded(qe); + + // Request dispatch for the newly enqueued element. + // TODO consider optimizing to do direct dispatch? + // It might be better if the dispatcher itself provided + // this for cases where the caller is on the same dispatcher + if (isDispatchReady()) { + notifyReady(); + // while(pollingDispatch()); + } } public boolean pollingDispatch() { @@ -303,7 +422,7 @@ // Process shared consumers: if (!sharedConsumers.isEmpty()) { - QueueElement next = sharedCursor.getNext(); + QueueElement next = sharedCursor.getNext(); if (next != null) { // See if there are any interested consumers: @@ -412,11 +531,11 @@ boolean isStarted; // The consumer's cursor: - final Cursor cursor; + final Cursor cursor; SubscriptionContext(Subscription target) { this.sub = target; - this.cursor = new Cursor(target.toString(), !sub.isBrowser(), true); + this.cursor = openCursor(target.toString(), true, !sub.isBrowser()); cursor.setCursorReadyListener(new CursorReadyListener() { public void onElementReady() { if (!isLinked()) { @@ -468,7 +587,7 @@ return; } - QueueElement next = cursor.getNext(); + QueueElement next = cursor.getNext(); // If the next element isn't yet available // then unlink this subscription if (next == null) { @@ -546,7 +665,7 @@ } } - public final int offer(QueueElement qe) { + public final int offer(QueueElement qe) { // If we are already passed this element return NO_MATCH: if (cursor.getCurrentSequeunce() > qe.sequence) { @@ -561,7 +680,7 @@ // Check for expiration: if (qe.isExpired()) { - qe.acknowledge(); + acknowledge(qe); return ACCEPTED; } @@ -571,7 +690,10 @@ // See if the sink has room: if (sub.offer(qe.elem, this, callback)) { if (!sub.isBrowser()) { - qe.setAcquired(this); + qe.setAcquired(true); + if (RELEASE_ON_ACQUISITION) { + inputController.elementDispatched(qe.getElement()); + } // If remove on dispatch acknowledge now: if (callback == null) { @@ -632,16 +754,19 @@ public void onElementReady(); } - class Cursor implements Comparable { + static class Cursor implements Comparable> { private CursorReadyListener readyListener; private final String name; + private final SharedQueue.ElementLoader loader; + private final SortedLinkedList> queue; + private boolean activated = false;; // The next element for this cursor, always non null // if activated, unless no element available: - QueueElement current = null; + QueueElement current = null; // The current sequence number for this cursor, // used when inactive or pointing to an element // sequence number beyond the queue's limit. @@ -649,8 +774,8 @@ // The cursor is holding references for all // elements between first and last inclusive: - QueueElement firstRef = null; - QueueElement lastRef = null; + QueueElement firstRef = null; + QueueElement lastRef = null; // This is set to the last block that for which // we have requested a load: long lastBlockRequest = -1; @@ -659,38 +784,30 @@ // When the limiter is set the cursor is able to // keep as many elements in memory as its limiter // allows. - private final IFlowSizeLimiter memoryLimiter; - private final IFlowController memoryController; + private final IFlowController> memoryController; // Indicates whether this cursor skips acquired elements private final boolean skipAcquired; // Indicates whether this cursor will page in elements - // private final boolean pageInElements; - public Cursor(String name, boolean skipAcquired, boolean pageInElements) { + private long limit = Long.MAX_VALUE; + + public Cursor(SortedLinkedList> queue, SharedQueue.ElementLoader loader, String name, boolean skipAcquired, boolean pageInElements, + IFlowController> memoryController) { this.name = name; + this.queue = queue; + this.loader = loader; + this.skipAcquired = skipAcquired; this.pageInElements = pageInElements; // Set up a limiter if this cursor pages in elements, and memory // limit is less than the queue size: - if (pageInElements && persistencePolicy.isPagingEnabled() && DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) { - memoryLimiter = new SizeLimiter(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) { - public int getElementSize(QueueElement qe) { - return qe.size; - }; - }; - - memoryController = new FlowController(null, flow, memoryLimiter, mutex) { - @Override - public IFlowResource getFlowResource() { - return SharedQueue.this; - } - }; + if (pageInElements) { + this.memoryController = memoryController; } else { - memoryLimiter = null; - memoryController = null; + this.memoryController = null; } } @@ -702,8 +819,8 @@ * The element for which to check. * @return */ - public final boolean offer(QueueElement qe, ISourceController controller) { - if (activated && memoryLimiter != null) { + public final boolean offer(QueueElement qe, ISourceController controller) { + if (activated && memoryController != null) { getNext(); if (lastRef != null) { // Return true if we absorbed it: @@ -712,7 +829,11 @@ } // If our last ref is close to this one reserve the element else if (qe.getPrevious() == lastRef) { - return addCursorRef(qe, controller); + if (addCursorRef(qe, controller)) { + return true; + } else { + return false; + } } } return false; @@ -724,7 +845,7 @@ public final void reset(long sequence) { updateSequence(sequence); - current = null; + updateCurrent(null); } public final void activate() { @@ -746,7 +867,8 @@ // If we're passing into a new block release the old one: if (firstRef.isLastInBlock()) { - System.out.println(this + " releasing block:" + firstRef.restoreBlock); + if (DEBUG) + System.out.println(this + " releasing block:" + firstRef.restoreBlock); loader.releaseBlock(this, firstRef.restoreBlock); } @@ -758,31 +880,47 @@ } // Release the last requested block: - if (persistencePolicy.isPageOutPlaceHolders()) { + if (loader.isPageOutPlaceHolders() && lastBlockRequest >= 0) { loader.releaseBlock(this, lastBlockRequest); } lastBlockRequest = -1; - // Let go of our current ref: - current = null; + updateCurrent(null); activated = false; } } /** + * Updates the current ref. We keep a soft ref to the current to keep it + * in the queue so that we can get at the next without a costly lookup. + */ + private final void updateCurrent(QueueElement qe) { + if (qe == current) { + return; + } + if (current != null) { + current.releaseSoftRef(); + } + current = qe; + if (current != null) { + current.addSoftRef(); + } + } + + /** * Makes sure elements are paged in */ private final void updatePagingRefs() { if (!activated) return; - if (pageInElements && memoryLimiter != null) { + if (pageInElements && memoryController != null) { // Release memory references up to our sequence number while (firstRef != null && firstRef.getSequence() < sequence) { boolean lastInBlock = firstRef.isLastInBlock(); - QueueElement next = firstRef.getNext(); + QueueElement next = firstRef.getNext(); firstRef.releaseHardRef(memoryController); // If we're passing into a new block release the old one: @@ -802,14 +940,14 @@ } // Now add refs for as many elements as we can hold: - QueueElement next = null; + QueueElement next = null; if (lastRef == null) { next = current; } else { next = lastRef.getNext(); } - while (next != null && !memoryLimiter.getThrottled()) { + while (next != null && !memoryController.isSinkBlocked()) { if (!addCursorRef(next, null)) { break; } @@ -819,11 +957,11 @@ // Otherwise we still need to ensure the block has been loaded: else if (current != null && !current.isLoaded()) { if (lastBlockRequest != current.restoreBlock) { - if (persistencePolicy.isPageOutPlaceHolders()) { + if (lastBlockRequest != -1) { loader.releaseBlock(this, lastBlockRequest); } - loader.loadBlock(this, current.restoreBlock); lastBlockRequest = current.restoreBlock; + loader.reserveBlock(this, lastBlockRequest); } } } @@ -839,13 +977,13 @@ * The controller adding the element. * @return false if the element isn't in memory. */ - private final boolean addCursorRef(QueueElement qe, ISourceController controller) { + private final boolean addCursorRef(QueueElement qe, ISourceController controller) { // Make sure we have requested the block: if (qe.restoreBlock != lastBlockRequest) { lastBlockRequest = qe.restoreBlock; if (DEBUG) - System.out.println(this + " requesting block:" + lastBlockRequest); - loader.loadBlock(this, lastBlockRequest); + System.out.println(this + " requesting block:" + lastBlockRequest + " for" + qe); + loader.reserveBlock(this, lastBlockRequest); } // If the next element isn't loaded then we can't yet @@ -865,25 +1003,22 @@ private final void updateSequence(final long newSequence) { this.sequence = newSequence; - if (DEBUG && sequence > nextSequenceNumber) { - new Exception(this + "cursor overflow").printStackTrace(); - } } /** * Sets the cursor to the next sequence number after the provided * element: */ - public final void skip(QueueElement elem) { - QueueElement next = elem.isLinked() ? elem.getNext() : null; + public final void skip(QueueElement elem) { + QueueElement next = elem.isLinked() ? elem.getNext() : null; if (next != null) { updateSequence(next.sequence); if (activated) { - current = next; + updateCurrent(next); } } else { - current = null; + updateCurrent(null); updateSequence(sequence + 1); } updatePagingRefs(); @@ -893,21 +1028,22 @@ * @return the next available element or null if one is not currently * available. */ - public final QueueElement getNext() { + public final QueueElement getNext() { try { if (queue.isEmpty() || queue.getTail().sequence < sequence) { - current = null; + updateCurrent(null); return null; } if (queue.getTail().sequence == sequence) { - current = queue.getTail(); + updateCurrent(queue.getTail()); } - // Get a pointer to the next element - if (current == null || !current.isLinked()) { - current = queue.upper(sequence, true); + // If we don't have a current, then look it up based + // on our sequence: + if (current == null) { + updateCurrent(queue.upper(sequence, true)); if (current == null) { return null; } @@ -915,8 +1051,8 @@ // Skip removed elements (and acquired ones if requested) while ((skipAcquired && current.isAcquired()) || current.isDeleted()) { - QueueElement last = current; - current = current.getNext(); + QueueElement last = current; + updateCurrent(current.getNext()); // If the next element is null, increment our sequence // and return: @@ -946,7 +1082,7 @@ } finally { // Don't hold on to a current ref if we aren't activated: if (!activated) { - current = null; + updateCurrent(null); } updatePagingRefs(); } @@ -968,7 +1104,7 @@ return sequence; } - public int compareTo(Cursor o) { + public int compareTo(Cursor o) { if (o.sequence > sequence) { return -1; } else if (sequence > o.sequence) { @@ -997,7 +1133,9 @@ */ public void onElementsLoaded() { if (readyListener != null && isReady()) { - System.out.println(this + " notifying ready"); + if (DEBUG) { + System.out.println(this + " notifying ready"); + } readyListener.onElementReady(); } } @@ -1013,12 +1151,15 @@ * @return true if the cursor has passed the end of the queue. */ public boolean atEnd() { - // TODO Auto-generated method stub if (queue.isEmpty()) { return true; } - QueueElement tail = queue.getTail(); + if (sequence > limit) { + return true; + } + + QueueElement tail = queue.getTail(); // Can't be at the end if the tail isn't loaded: if (!tail.isLoaded()) { return false; @@ -1034,12 +1175,20 @@ public String toString() { return "Cursor: " + sequence + " [" + name + "]"; } + + /** + * @param l + */ + public void setLimit(long l) { + limit = l; + } } - class QueueElement extends SortedLinkedListNode implements SubscriptionDeliveryCallback, SaveableQueueElement { + static class QueueElement extends SortedLinkedListNode> implements SubscriptionDeliveryCallback, SaveableQueueElement { final long sequence; final long restoreBlock; + final SharedQueue queue; V elem; int size = -1; @@ -1063,14 +1212,14 @@ boolean saved = false; boolean deleted = false; - SubscriptionContext owner; + boolean acquired = false; - public QueueElement(V elem, long sequence) { + public QueueElement(V elem, long sequence, SharedQueue queue) { this.elem = elem; - + this.queue = queue; if (elem != null) { - size = sizeLimiter.getElementSize(elem); - expiration = expirationMapper.map(elem); + size = queue.getElementSize(elem); + expiration = queue.getElementExpiration(elem); } this.sequence = sequence; this.restoreBlock = sequence / RESTORE_BLOCK_SIZE; @@ -1083,8 +1232,8 @@ return deleted; } - public QueueElement(RestoredElement restored) throws Exception { - this(restored.getElement(), restored.getSequenceNumber()); + public QueueElement(RestoredElement restored, SharedQueue queue) throws Exception { + this(restored.getElement(), restored.getSequenceNumber(), queue); this.size = restored.getElementSize(); this.expiration = restored.getExpiration(); saved = true; @@ -1103,12 +1252,12 @@ // If this is the first request for this // element request a load: if (hardRefs == 1) { - loader.pageIn(this); + queue.getLoader().pageIn(this); } } } - public final void releaseHardRef(IFlowController controller) { + public final void releaseHardRef(IFlowController> controller) { hardRefs--; if (hardRefs == 0) { unload(controller); @@ -1131,35 +1280,33 @@ assert softRefs >= 0; } - public final void setAcquired(SubscriptionContext owner) { - this.owner = owner; + public final void setAcquired(boolean val) { + this.acquired = val; } public final void acknowledge() { - synchronized (mutex) { - delete(); - } + queue.acknowledge(this); } - public final void delete() { + public final boolean delete() { if (!deleted) { deleted = true; - owner = null; - totalQueueCount--; if (isExpirable()) { - expirator.elementRemoved(this); + queue.getExpirator().elementRemoved(this); } - sizeController.elementDispatched(elem); + if (saved) { - store.deleteQueueElement(queueDescriptor, elem); + queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem); } elem = null; unload(null); + return true; } + return false; } public final void unacquire(ISourceController source) { - owner = null; + acquired = false; if (isExpired()) { acknowledge(); } else { @@ -1174,38 +1321,38 @@ */ public final void unload(ISourceController controller) { - // Can't unlink if there is a cursor ref, the cursor - // needs this element to decrement it's limiter space - if (hardRefs > 0) { + // Don't page out of there is a hard ref to the element + // or if it is acquired (since we need the element + // during delete: + if (!deleted && (hardRefs > 0 || acquired)) { return; } // If the element didn't require persistence on enqueue, then // we'll need to save it now before paging it out. - // Note that we don't page out the element if it has an owner - // because we need the element when we issue the delete. - if (owner == null && elem != null && !persistencePolicy.isPersistent(elem)) { - save(controller, true); - if (DEBUG) - System.out.println("Paged out element: " + this); - elem = null; - } + if (elem != null) { + if (!deleted) { + if (!queue.getPersistencePolicy().isPersistent(elem)) { + save(controller, true); + if (DEBUG) + System.out.println("Paged out element: " + this); + } - // If save is pending don't unload until the save has completed - if (savePending) { - return; + // If save is pending don't unload until the save has + // completed + if (savePending) { + return; + } + } + + elem = null; } - QueueElement next = getNext(); - QueueElement prev = getPrevious(); + QueueElement next = getNext(); + QueueElement prev = getPrevious(); - // See if we can unload this element: - // Don't unload the element if it is: - // -Has an owner (we keep the element in memory so we don't - // forget about the owner). - // -If there are soft references to it - // -Or it is in the load queue - if (owner == null && softRefs == 0 && !loader.inLoadQueue(this)) { + // See if we can unload this element, don't unload if we have a soft + if (softRefs == 0) { // If deleted unlink this element from the queue, and link // together adjacent paged out entries: if (deleted) { @@ -1215,7 +1362,10 @@ if (next != null && prev != null && !next.isLoaded() && !prev.isLoaded()) { next.unlink(); } - } else { + } + // Otherwise as long as the element isn't acquired we can unload + // it. If it is acquired we keep the soft ref around + else if (!acquired && queue.getPersistencePolicy().isPageOutPlaceHolders()) { loaded = false; @@ -1230,6 +1380,8 @@ if (prev != null && !prev.isLoaded()) { unlink(); } + } else { + return; } } @@ -1246,9 +1398,9 @@ * @throws Exception * If there was an error creating the loaded element: */ - public final QueueElement loadAfter(RestoredElement re) throws Exception { + public final QueueElement loadAfter(RestoredElement re) throws Exception { - QueueElement ret = null; + QueueElement ret = null; // See if this element represents the one being loaded: if (sequence == re.getSequenceNumber()) { @@ -1263,14 +1415,15 @@ if (re.getNextSequenceNumber() != -1) { // Otherwise if our next pointer doesn't match the // next restored number: - QueueElement next = getNext(); + QueueElement next = getNext(); if (next == null || next.sequence != re.getNextSequenceNumber()) { - next = new QueueElement(null, re.getNextSequenceNumber()); + next = new QueueElement(null, re.getNextSequenceNumber(), queue); next.loaded = false; this.linkAfter(next); } } this.size = re.getElementSize(); + this.expiration = re.getExpiration(); } // If we're paged out set our elem to the restored one: @@ -1281,9 +1434,9 @@ savePending = false; } else { - ret = new QueueElement(re); + ret = new QueueElement(re, queue); // Otherwise simply link this element into the list: - queue.add(ret); + queue.queue.add(ret); } if (DEBUG) @@ -1301,7 +1454,7 @@ public final boolean isLastInBlock() { if (isTailNode()) { - return nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock; + return queue.nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock; } else { return next.restoreBlock != restoreBlock; } @@ -1316,7 +1469,7 @@ } public final boolean isAcquired() { - return owner != null || deleted; + return acquired || deleted; } public final long getExpiration() { @@ -1333,14 +1486,14 @@ public final void save(ISourceController controller, boolean delayable) { if (!saved) { - store.persistQueueElement(this, controller, delayable); + queue.getQueueStore().persistQueueElement(this, controller, delayable); saved = true; // If paging is enabled we can't unload the element until it // is saved, otherwise there is no guarantee that it will be // in the store on a subsequent load requests because the // save is done asynchronously. - if (persistencePolicy.isPagingEnabled()) { + if (queue.getPersistencePolicy().isPagingEnabled()) { savePending = true; } } @@ -1375,7 +1528,8 @@ * () */ public void notifySave() { - synchronized (mutex) { + // TODO Refactor this: + synchronized (queue.mutex) { // Unload if we haven't already: if (isLinked()) { savePending = false; @@ -1402,18 +1556,18 @@ * getQueueDescriptor() */ public QueueDescriptor getQueueDescriptor() { - return queueDescriptor; + return queue.getDescriptor(); } public String toString() { - return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner; + return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " aquired: " + acquired; } } private class Expirator { - private final Cursor cursor = new Cursor("Expirator", false, false); + private final Cursor cursor = openCursor("Expirator-" + getResourceName(), false, false); // Number of expirable elements in the queue: private int count = 0; @@ -1423,13 +1577,13 @@ private static final int MAX_CACHE_SIZE = 500; private long uncachedMin = Long.MAX_VALUE; - TreeMap> expirationCache = new TreeMap>(); + TreeMap>> expirationCache = new TreeMap>>(); private int cacheSize = 0; public final boolean needsDispatch() { // If we have expiration candidates or are scanning the // queue request dispatch: - return hasExpirationCandidates() || cursor.isReady(); + return hasExpirables() || cursor.isReady(); } public void start() { @@ -1453,7 +1607,9 @@ } public void dispatch() { - + if (!needsDispatch()) { + return; + } long now = -1; // If their are uncached elements in the queue that are ready for // expiration @@ -1465,7 +1621,7 @@ // Scan the queue looking for expirables: if (cursor.isReady()) { - QueueElement qe = cursor.getNext(); + QueueElement qe = cursor.getNext(); while (qe != null) { if (!loaded) { if (qe.sequence < recoverySequence) { @@ -1485,7 +1641,8 @@ // Finished loading: if (!loaded && cursor.getCurrentSequeunce() >= recoverySequence) { - System.out.println(this + " Queue Load Complete"); + if (DEBUG) + System.out.println(this + " Queue Load Complete"); loaded = true; cursor.deactivate(); } else if (cursor.atEnd()) { @@ -1493,15 +1650,15 @@ } } - if (now == -1) { + if (now == -1 && !expirationCache.isEmpty()) { now = System.currentTimeMillis(); } // while (!expirationCache.isEmpty()) { - Entry> first = expirationCache.firstEntry(); + Entry>> first = expirationCache.firstEntry(); if (first.getKey() < now) { - for (QueueElement qe : first.getValue()) { + for (QueueElement qe : first.getValue()) { qe.releaseSoftRef(); qe.acknowledge(); } @@ -1509,7 +1666,7 @@ } } - public void elementAdded(QueueElement qe) { + public void elementAdded(QueueElement qe) { if (qe.isExpirable() && !qe.isDeleted()) { count++; if (qe.isExpired()) { @@ -1520,10 +1677,10 @@ } } - private void addToCache(QueueElement qe) { + private void addToCache(QueueElement qe) { // See if we should cache it, evicting entries if possible if (cacheSize >= MAX_CACHE_SIZE) { - Entry> last = expirationCache.lastEntry(); + Entry>> last = expirationCache.lastEntry(); if (last.getKey() <= qe.expiration) { // Keep track of the minimum uncached value: if (qe.expiration < uncachedMin) { @@ -1533,7 +1690,7 @@ } // Evict the entry: - Iterator i = last.getValue().iterator(); + Iterator> i = last.getValue().iterator(); removeFromCache(i.next()); if (last.getKey() <= uncachedMin) { @@ -1542,19 +1699,19 @@ } } - HashSet entry = new HashSet(); + HashSet> entry = new HashSet>(); entry.add(qe); qe.addSoftRef(); cacheSize++; - HashSet old = expirationCache.put(qe.expiration, entry); + HashSet> old = expirationCache.put(qe.expiration, entry); if (old != null) { old.add(qe); expirationCache.put(qe.expiration, old); } } - private final void removeFromCache(QueueElement qe) { - HashSet last = expirationCache.get(qe.expiration); + private final void removeFromCache(QueueElement qe) { + HashSet> last = expirationCache.get(qe.expiration); if (last != null && last.remove(qe.getSequenceNumber())) { cacheSize--; qe.releaseSoftRef(); @@ -1564,7 +1721,7 @@ } } - public void elementRemoved(QueueElement qe) { + public void elementRemoved(QueueElement qe) { // While loading, ignore elements that we haven't been seen yet. if (!loaded && qe.sequence < recoverySequence && qe.sequence > lastRecoverdSequence) { return; @@ -1577,11 +1734,7 @@ } } - public boolean hasExpirationCandidates() { - return !loaded || hasExpirables(); - } - - public boolean hasExpirables() { + public final boolean hasExpirables() { if (count == 0) { return false; } else { @@ -1617,85 +1770,60 @@ private class ElementLoader implements RestoreListener { private LinkedList> fromDatabase = new LinkedList>(); - private final HashMap> requestedBlocks = new HashMap>(); - private final HashSet pagingCursors = new HashSet(); + private final HashMap>> requestedBlocks = new HashMap>>(); + private final HashSet> pagingCursors = new HashSet>(); - public boolean inLoadQueue(QueueElement queueElement) { + private boolean loadOnRequest = false; + private Cursor recoveryCursor = null; + + public final void start() { + + // If paging is enabled and we don't keep placeholders in memory + // then we load on + // request. + if (persistencePolicy.isPagingEnabled() && persistencePolicy.isPageOutPlaceHolders()) { + loadOnRequest = true; + } else { + loadOnRequest = false; + if (getEnqueuedCount() > 0) { + recoveryCursor = openCursor("Loader", false, false); + recoveryCursor.setLimit(nextSequenceNumber - 1); + recoveryCursor.activate(); + } + } + } + + public boolean inLoadQueue(QueueElement queueElement) { return requestedBlocks.containsKey(queueElement.restoreBlock); } + public final boolean isPageOutPlaceHolders() { + return persistencePolicy.isPageOutPlaceHolders(); + } + /** * @param queueElement */ - public void pageIn(QueueElement qe) { + public void pageIn(QueueElement qe) { store.restoreQueueElements(queueDescriptor, false, qe.sequence, qe.sequence, 1, this); } - /** - * Must be called after an element is added to the queue to enforce - * memory limits - * - * @param elem - * The added element: - * @param source - * The source of the message - */ - public final void elementAdded(QueueElement qe, ISourceController source) { - - if (persistencePolicy.isPagingEnabled()) { - - // Check with the shared cursor to see if it is willing to - // absorb the element. If so that's good enough. - if (sharedCursor.offer(qe, source)) { - return; - } - - // Otherwise check with any other open cursor to see if - // it can take the element: - HashSet active = requestedBlocks.get(qe.sequence); - - // If there are none, unload the element: - if (active == null) { - qe.unload(source); - return; - } - - // See if a cursor is willing to hang on to the - // element: - boolean accepted = false; - for (Cursor cursor : active) { - // Already checked the shared cursor above: - if (cursor == sharedCursor) { - continue; - } - - if (cursor.offer(qe, source)) { - accepted = true; - } - } - - // If no cursor accepted it, then page out the element: - // keeping the element loaded. - if (!accepted) { - qe.unload(source); - } - } + public final Collection> getActiveCursors(QueueElement qe) { + return requestedBlocks.get(qe.getSequence()); } - // Updates memory when an element is loaded from the database: - private final void elementLoaded(QueueElement qe) { - // TODO track the rate of loaded elements vs those that - // are added to the queue. We'll want to throttle back - // enqueueing sources to a rate less than the restore - // rate so we can stay out of the store. - } + public void reserveBlock(Cursor cursor, long block) { + HashSet> cursors = requestedBlocks.get(block); + boolean load = recoveryCursor != null && cursor == recoveryCursor; - public void loadBlock(Cursor cursor, long block) { - HashSet cursors = requestedBlocks.get(block); if (cursors == null) { - cursors = new HashSet(); + cursors = new HashSet>(); requestedBlocks.put(block, cursors); + load |= loadOnRequest; + } + cursors.add(cursor); + if (load) { // Max sequence number is the end of this restoreBlock: long firstSequence = block * RESTORE_BLOCK_SIZE; long maxSequence = block * RESTORE_BLOCK_SIZE + RESTORE_BLOCK_SIZE - 1; @@ -1706,21 +1834,17 @@ if (DEBUG) System.out.println(cursor + " requesting restoreBlock:" + block + " from " + firstSequence + " to " + maxSequence + " max: " + maxCount + " queueMax: " + nextSequenceNumber); - // If paging is enabled only pull in queue records, don't bring + // If paging is enabled only pull in queue records, don't + // bring // in the payload. // Each active cursor will have to pull in messages based on // available memory. store.restoreQueueElements(queueDescriptor, persistencePolicy.isPagingEnabled(), firstSequence, maxSequence, maxCount, this); } - cursors.add(cursor); } - public void releaseBlock(Cursor cursor, long block) { - // Don't do anything if we don't page out placeholders - if (!persistencePolicy.isPageOutPlaceHolders()) { - return; - } - HashSet cursors = requestedBlocks.get(block); + public void releaseBlock(Cursor cursor, long block) { + HashSet> cursors = requestedBlocks.get(block); if (cursors == null) { if (true || DEBUG) System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor); @@ -1731,9 +1855,9 @@ // If this is the last cursor active in this block // unload the block: if (persistencePolicy.isPagingEnabled()) { - QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true); + QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true); while (qe != null && qe.restoreBlock == block) { - QueueElement next = qe.getNext(); + QueueElement next = qe.getNext(); qe.unload(cursor.memoryController); qe = next; } @@ -1765,7 +1889,7 @@ for (RestoredElement restored : restoredElems) { try { - QueueElement qe = queue.lower(restored.getSequenceNumber(), true); + QueueElement qe = queue.lower(restored.getSequenceNumber(), true); // If we don't have a paged out place holder for this // element @@ -1776,7 +1900,13 @@ } qe = qe.loadAfter(restored); - elementLoaded(qe); + + // If paging isn't enabled then add a hard ref to the + // element, + // this will keep it around until it deleted + if (!persistencePolicy.isPagingEnabled()) { + qe.addHardRef(); + } // If we don't page out place holders we needn't track // block @@ -1795,11 +1925,25 @@ } // Add restoring consumers back to trailing consumers: - for (Cursor paging : pagingCursors) + for (Cursor paging : pagingCursors) paging.onElementsLoaded(); pagingCursors.clear(); } + + // Advance the recovery cursor: + if (recoveryCursor != null) { + while (recoveryCursor.isReady()) { + QueueElement qe = recoveryCursor.getNext(); + if (recoveryCursor.atEnd()) { + recoveryCursor.deactivate(); + recoveryCursor = null; + break; + } + recoveryCursor.skip(qe); + + } + } } public final boolean hasRestoredMessages() { @@ -1851,7 +1995,7 @@ @Override protected ISinkController getSinkController(V elem, ISourceController source) { - return sizeController; + return inputController; } public V poll() { @@ -1859,6 +2003,6 @@ } public IFlowController getFlowControler() { - return sizeController; + return inputController; } } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=774764&r1=774763&r2=774764&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu May 14 13:41:14 2009 @@ -40,7 +40,7 @@ public abstract class BrokerTestBase extends TestCase { - protected static final int PERFORMANCE_SAMPLES = 3; + protected static final int PERFORMANCE_SAMPLES = 5; protected static final int IO_WORK_AMOUNT = 0; protected static final int FANIN_COUNT = 10; @@ -118,6 +118,9 @@ } public void test_1_1_0() throws Exception { + if (ptp) { + return; + } producerCount = 1; destCount = 1; @@ -376,7 +379,7 @@ sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI); brokers.add(sendBroker); } - + startBrokers(); Destination[] dests = new Destination[destCount]; @@ -476,7 +479,7 @@ store = StoreFactory.createStore("memory"); } - store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName())); + store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName())); store.setDeleteAllMessages(PURGE_STORE); return store; } @@ -497,15 +500,14 @@ } } - private void startBrokers() throws Exception - { + private void startBrokers() throws Exception { for (MessageBroker broker : brokers) { broker.start(); } } - + private void startClients() throws Exception { - + for (RemoteConsumer connection : consumers) { connection.start(); } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=774764&r1=774763&r2=774764&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Thu May 14 13:41:14 2009 @@ -31,6 +31,7 @@ import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.dispatch.IDispatcher; @@ -68,7 +69,7 @@ BrokerQueueStore queueStore; private static final boolean USE_KAHA_DB = true; private static final boolean PERSISTENT = true; - private static final boolean PURGE_STORE = false; + private static final boolean PURGE_STORE = true; protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items"); protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items"); @@ -111,7 +112,7 @@ store = StoreFactory.createStore("memory"); } - store.setStoreDirectory(new File("test-data/shared-message-queue-test/")); + store.setStoreDirectory(new File("test-data/shared-queue-perf-test/")); store.setDeleteAllMessages(PURGE_STORE); return store; } @@ -121,6 +122,7 @@ producers.clear(); queues.clear(); stopServices(); + consumerStartDelay = 0; } public void testSharedQueue_1_1_1() throws Exception { @@ -141,6 +143,7 @@ try { createQueues(1); createProducers(1); + consumerStartDelay = 10; createConsumers(1); doTest(); @@ -202,56 +205,48 @@ } private void doTest() throws Exception { - - try - { + + try { // Start queues: for (IQueue queue : queues) { queue.start(); } - - Runnable startConsumers = new Runnable() - { - public void run() - { + + Runnable startConsumers = new Runnable() { + public void run() { // Start consumers: for (Consumer consumer : consumers) { consumer.start(); } } }; - - if(consumerStartDelay > 0) - { + + if (consumerStartDelay > 0) { dispatcher.schedule(startConsumers, consumerStartDelay, TimeUnit.SECONDS); - } - else - { + } else { startConsumers.run(); } - + // Start producers: for (Producer producer : producers) { producer.start(); } reportRates(); - } - finally - { + } finally { // Stop producers: for (Producer producer : producers) { producer.stop(); } - + // Stop consumers: for (Consumer consumer : consumers) { consumer.stop(); } - + // Stop queues: for (IQueue queue : queues) { queue.stop(); - } + } } } @@ -303,7 +298,7 @@ protected final IFlowRelay outboundQueue; protected OpenWireMessageDelivery next; private int priority; - private final byte[] payload; + private final String payload; private int sequenceNumber; private final ActiveMQDestination destination; private final IQueue targetQueue; @@ -316,14 +311,21 @@ rate.name("Producer " + name + " Rate"); totalProducerRate.add(rate); dispatchContext = dispatcher.register(this, name); - payload = new byte[1024]; + // create a 1024 byte payload (2 bytes per char): + payload = new String(new byte[512]); producerId = new ProducerId(name); wireFormat = new OpenWireFormat(); wireFormat.setCacheEnabled(false); wireFormat.setSizePrefixDisabled(false); wireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION); - SizeLimiter limiter = new SizeLimiter(1000, 500); + SizeLimiter limiter = new SizeLimiter(1000 * 1024, 500 * 1024) { + @Override + public int getElementSize(OpenWireMessageDelivery elem) { + return elem.getFlowLimiterSize(); + } + }; + Flow flow = new Flow(name, true); outboundQueue = new SingleFlowRelay(flow, name, limiter); outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1)); @@ -384,14 +386,14 @@ } private void createNextMessage() throws JMSException { - ActiveMQBytesMessage message = new ActiveMQBytesMessage(); + ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setJMSPriority(priority); message.setProducerId(producerId); message.setMessageId(new MessageId(name, ++sequenceNumber)); message.setDestination(destination); message.setPersistent(PERSISTENT); if (payload != null) { - message.writeBytes(payload); + message.setText(payload); } next = new OpenWireMessageDelivery(message); } @@ -414,12 +416,14 @@ private final ExclusiveQueue queue; private final IQueue sourceQueue; private final QueueStore.QueueDescriptor queueDescriptor; + private int limit = 20000; + private int count = 0; public Consumer(String name, IQueue sourceQueue) { this.sourceQueue = sourceQueue; this.name = name; Flow flow = new Flow(name + "-outbound", false); - limiter = new SizeLimiter(1024, 512) { + limiter = new SizeLimiter(1024 * 1024, 512 * 1024) { public int getElementSize(MessageDelivery m) { return m.getFlowLimiterSize(); } @@ -439,6 +443,10 @@ public void drain(MessageDelivery elem, ISourceController controller) { elem.acknowledge(queueDescriptor); rate.increment(); + /* + if (count++ == limit) { + queue.stop(); + }*/ } }); Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=774764&r1=774763&r2=774764&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Thu May 14 13:41:14 2009 @@ -101,9 +101,17 @@ try { msg.setStringProperty(property, property); } catch (JMSException e) { - new RuntimeException(e); + throw new RuntimeException(e); } } + + //Call the before marshal method to sync the content so we get an + //accurate size: + try { + msg.beforeMarshall(null); + } catch (IOException e) { + throw new RuntimeException(e); + } next = new OpenWireMessageDelivery(msg); } } Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=774764&r1=774763&r2=774764&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original) +++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu May 14 13:41:14 2009 @@ -13,6 +13,7 @@ import org.apache.activemq.queue.QueueStore; import org.apache.activemq.queue.SharedPriorityQueue; import org.apache.activemq.queue.SharedQueue; +import org.apache.activemq.queue.SharedQueueOld; import org.apache.activemq.queue.Subscription; import org.apache.activemq.util.Mapper; @@ -27,6 +28,7 @@ private Mapper keyExtractor; private final MockStoreAdapater store = new MockStoreAdapater(); private static final PersistencePolicy NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY(); + private static final boolean USE_OLD_QUEUE = false; private IQueue createQueue() { @@ -61,15 +63,27 @@ queue.initialize(0, 0, 0, 0); return queue; } else { - SizeLimiter limiter = new SizeLimiter(100, 1); - SharedQueue queue = new SharedQueue(destination.getName().toString(), limiter); - queue.setKeyMapper(keyExtractor); - queue.setAutoRelease(true); - queue.setDispatcher(broker.getDispatcher()); - queue.setStore(store); - queue.setPersistencePolicy(NO_PERSISTENCE); - queue.initialize(0, 0, 0, 0); - return queue; + if (USE_OLD_QUEUE) { + SizeLimiter limiter = new SizeLimiter(100, 1); + SharedQueueOld queue = new SharedQueueOld(destination.getName().toString(), limiter); + queue.setKeyMapper(keyExtractor); + queue.setAutoRelease(true); + queue.setDispatcher(broker.getDispatcher()); + queue.setStore(store); + queue.setPersistencePolicy(NO_PERSISTENCE); + queue.initialize(0, 0, 0, 0); + return queue; + } else { + SizeLimiter limiter = new SizeLimiter(100, 1); + SharedQueue queue = new SharedQueue(destination.getName().toString(), limiter); + queue.setKeyMapper(keyExtractor); + queue.setAutoRelease(true); + queue.setDispatcher(broker.getDispatcher()); + queue.setStore(store); + queue.setPersistencePolicy(NO_PERSISTENCE); + queue.initialize(0, 0, 0, 0); + return queue; + } } } @@ -83,7 +97,7 @@ public final void addConsumer(final DeliveryTarget dt) { Subscription sub = new Subscription() { - + public boolean isBrowser() { return false; }