Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Mon Apr 27 18:40:44 2009
@@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -35,6 +37,7 @@
import org.apache.activemq.broker.store.Store.FatalStoreException;
import org.apache.activemq.broker.store.Store.KeyNotFoundException;
import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.dispatch.IDispatcher;
@@ -45,12 +48,16 @@
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore.RestoreListener;
+import org.apache.activemq.queue.QueueStore.RestoredElement;
import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
public class BrokerDatabase extends AbstractLimitedFlowResource<BrokerDatabase.Operation> {
+ private static final boolean DEBUG = false;
private final Store store;
private final Flow databaseFlow = new Flow("database", false);
@@ -68,13 +75,12 @@
private AtomicBoolean notify = new AtomicBoolean(false);
private Semaphore opsReady = new Semaphore(0);
private long opSequenceNumber;
- private long flushPointer = 0; // The last seq num for which flush was
- // requested
+ private long flushPointer = -1; // The last seq num for which flush was requested
private long requestedDelayedFlushPointer = -1; // Set to the last sequence
// num scheduled for delay
private long delayedFlushPointer = 0; // The last delayable sequence num
// requested.
- private final long FLUSH_DELAY_MS = 100;
+ private final long FLUSH_DELAY_MS = 50;
private final Runnable flushDelayCallback;
public interface DatabaseListener {
@@ -87,26 +93,11 @@
public void onDatabaseException(IOException ioe);
}
- /**
- * Holder of a restored message to be passed to a
- * {@link MessageRestoreListener}. This allows the demarshalling to be done
- * by the listener instead of the the database worker.
- *
- * @author cmacnaug
- */
- public interface RestoredMessage {
- MessageDelivery getMessageDelivery() throws IOException;
- }
-
- public interface MessageRestoreListener {
- public void messagesRestored(Collection<RestoredMessage> msgs);
- }
-
public BrokerDatabase(Store store, IDispatcher dispatcher) {
this.store = store;
this.dispatcher = dispatcher;
this.opQueue = new LinkedNodeList<OperationBase>();
- storeLimiter = new SizeLimiter<OperationBase>(5000, 0) {
+ storeLimiter = new SizeLimiter<OperationBase>(10000, 5000) {
@Override
public int getElementSize(OperationBase op) {
@@ -153,6 +144,11 @@
public synchronized void stop() throws Exception {
if (flushThread != null) {
+ synchronized(opQueue)
+ {
+ updateFlushPointer(opSequenceNumber + 1);
+ }
+
running.set(false);
boolean interrupted = false;
while (true) {
@@ -175,6 +171,18 @@
}
}
+ public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
+ // TODO Auto-generated method stub
+ return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
+
+ public Iterator<QueueQueryResult> execute(Session session) throws Exception {
+ // TODO Auto-generated method stub
+ return session.queueListByType(type, null, Integer.MAX_VALUE);
+ }
+
+ }, null);
+ }
+
/**
* Executes user supplied {@link Operation}. If the {@link Operation} does
* not throw any Exceptions, all updates to the store are committed,
@@ -214,7 +222,7 @@
synchronized (opQueue) {
op.opSequenceNumber = opSequenceNumber++;
opQueue.addLast(op);
- if (op.flushRequested) {
+ if (op.flushRequested || storeLimiter.getThrottled()) {
if (op.isDelayable() && FLUSH_DELAY_MS > 0) {
scheduleDelayedFlush(op.opSequenceNumber);
} else {
@@ -227,7 +235,8 @@
private void updateFlushPointer(long seqNumber) {
if (seqNumber > flushPointer) {
flushPointer = seqNumber;
- if (notify.get()) {
+ OperationBase op = opQueue.getHead();
+ if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) {
opsReady.release();
}
}
@@ -253,14 +262,18 @@
synchronized (opQueue) {
if (flushPointer < requestedDelayedFlushPointer) {
updateFlushPointer(requestedDelayedFlushPointer);
- requestedDelayedFlushPointer = -1;
- // Schedule next delay if needed:
- if (delayedFlushPointer > flushPointer) {
- scheduleDelayedFlush(delayedFlushPointer);
- } else {
- delayedFlushPointer = -1;
- }
+
+ }
+
+ //If another delayed flush has been scheduled schedule it:
+ requestedDelayedFlushPointer = -1;
+ // Schedule next delay if needed:
+ if (delayedFlushPointer > flushPointer) {
+ scheduleDelayedFlush(delayedFlushPointer);
+ } else {
+ delayedFlushPointer = -1;
}
+
}
}
@@ -268,7 +281,7 @@
if (!wait) {
synchronized (opQueue) {
OperationBase op = opQueue.getHead();
- if (op != null && op.opSequenceNumber <= flushPointer) {
+ if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) {
op.unlink();
return op;
}
@@ -293,19 +306,15 @@
}
}
- private class OpCounter {
- int count = 0;
- }
-
private final void processOps() {
- final OpCounter counter = new OpCounter();
+ int count = 0;
while (running.get()) {
final OperationBase firstOp = getNextOp(true);
if (firstOp == null) {
continue;
}
- counter.count = 0;
+ count = 0;
// The first operation we get, triggers a store transaction.
if (firstOp != null) {
@@ -317,26 +326,27 @@
// refactor.
while (op != null) {
final Operation toExec = op;
- counter.count++;
- //System.out.println("Executing " + op);
- store.execute(new Store.VoidCallback<Exception>() {
- @Override
- public void run(Session session) throws Exception {
-
- // Try to execute the operation against the
- // session...
- try {
- if (toExec.execute(session)) {
+ if (toExec.beginExecute()) {
+ count++;
+
+ store.execute(new Store.VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+
+ // Try to execute the operation against the
+ // session...
+ try {
+ toExec.execute(session);
processedQueue.add(toExec);
- } else {
- counter.count--;
+ } catch (CancellationException ignore) {
+ // System.out.println("Cancelled" +
+ // toExec);
}
- } catch (CancellationException ignore) {
- //System.out.println("Cancelled" + toExec);
}
- }
- }, null);
- if (counter.count < 1000) {
+ }, null);
+ }
+
+ if (count < 1000) {
op = getNextOp(false);
} else {
op = null;
@@ -370,11 +380,12 @@
for (Operation processed : processedQueue) {
processed.onRollback(e);
}
+ onDatabaseException(new IOException(e));
} catch (Exception e) {
for (Operation processed : processedQueue) {
processed.onRollback(e);
}
-
+ onDatabaseException(new IOException(e));
}
}
}
@@ -405,7 +416,7 @@
* @param queue
* The queue to add.
*/
- public void addQueue(AsciiBuffer queue) {
+ public void addQueue(QueueStore.QueueDescriptor queue) {
add(new QueueAddOperation(queue), null, false);
}
@@ -415,7 +426,7 @@
* @param queue
* The queue to delete.
*/
- public void deleteQueue(AsciiBuffer queue) {
+ public void deleteQueue(QueueStore.QueueDescriptor queue) {
add(new QueueDeleteOperation(queue), null, false);
}
@@ -449,8 +460,8 @@
*
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext saveMessage(MessageDelivery delivery, AsciiBuffer queue, ISourceController<?> source) throws IOException {
- return add(new AddMessageOperation(delivery, queue), source, false);
+ public OperationContext saveMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue, long queueSequence, ISourceController<?> source) throws IOException {
+ return add(new AddMessageOperation(delivery, queue, queueSequence), source, false);
}
/**
@@ -462,8 +473,8 @@
* The queue.
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext deleteMessage(MessageDelivery delivery, AsciiBuffer queue) {
- return add(new DeleteMessageOperation(delivery, queue), null, false);
+ public OperationContext deleteMessage(MessageDelivery delivery, QueueStore.QueueDescriptor queue) {
+ return add(new DeleteMessageOperation(delivery.getStoreTracking(), queue), null, false);
}
/**
@@ -476,20 +487,25 @@
* @param queue
* The queue for which to load messages
* @param first
- * The first queue sequence number to load.
+ * The first queue sequence number to load (-1 starts at
+ * begining)
+ * @param maxSequence
+ * The maximum sequence number to load (-1 if no limit)
* @param max
- * The maximum number of messages to load.
+ * The maximum number of messages to load (-1 if no limit)
* @param listener
* The listener to which messags should be passed.
* @return The {@link OperationContext} associated with the operation
*/
- public OperationContext restoreMessages(AsciiBuffer queue, long first, int max, MessageRestoreListener listener) {
- return add(new RestoreMessageOperation(queue, first, max, listener), null, false);
+ public OperationContext restoreMessages(QueueStore.QueueDescriptor queue, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+ return add(new RestoreMessageOperation(queue, first, maxCount, maxSequence, listener), null, true);
}
private void onDatabaseException(IOException ioe) {
if (listener != null) {
listener.onDatabaseException(ioe);
+ } else {
+ ioe.printStackTrace();
}
}
@@ -513,6 +529,12 @@
*/
public boolean executed();
+ /**
+ * Requests flush for this database operation (overriding a previous
+ * delay)
+ */
+ public void requestFlush();
+
}
/**
@@ -524,6 +546,14 @@
public interface Operation extends OperationContext {
/**
+ * Called when the saver is about to execute the operation. If true is
+ * returned the operation can no longer be canceled.
+ *
+ * @return false if the operation has been canceled.
+ */
+ public boolean beginExecute();
+
+ /**
* Gets called by the
* {@link Store#add(Operation, ISourceController, boolean)} method
* within a transactional context. If any exception is thrown including
@@ -532,7 +562,6 @@
* @param session
* provides you access to read and update the persistent
* data.
- * @return the result of the CallableCallback
* @throws Exception
* if an system error occured while executing the
* operations.
@@ -540,7 +569,7 @@
* if an system error occured while executing the
* operations.
*/
- public boolean execute(Session session) throws CancellationException, Exception, RuntimeException;
+ public void execute(Session session) throws CancellationException, Exception, RuntimeException;
/**
* Returns true if this operation can be delayed. This is useful in
@@ -608,6 +637,21 @@
}
/**
+ * Called when the saver is about to execute the operation. If true is
+ * returned the operation can no longer be cancelled.
+ *
+ * @return true if operation should be executed
+ */
+ public final boolean beginExecute() {
+ if (executePending.compareAndSet(true, false)) {
+ executed.set(true);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
* Gets called by the
* {@link Store#add(Operation, ISourceController, boolean)} method
* within a transactional context. If any exception is thrown including
@@ -616,7 +660,6 @@
* @param session
* provides you access to read and update the persistent
* data.
- * @return True if processed, false otherwise
* @throws Exception
* if an system error occured while executing the
* operations.
@@ -624,14 +667,9 @@
* if an system error occured while executing the
* operations.
*/
- public boolean execute(Session session) throws Exception, RuntimeException {
- if (executePending.compareAndSet(true, false)) {
- executed.set(true);
- doExcecute(session);
- return true;
- } else {
- return false;
- }
+ public void execute(Session session) throws Exception, RuntimeException {
+ if(DEBUG) System.out.println("Executing " + this);
+ doExcecute(session);
}
abstract protected void doExcecute(Session session);
@@ -644,6 +682,16 @@
return false;
}
+ /**
+ * Requests flush for this database operation (overriding a previous
+ * delay)
+ */
+ public void requestFlush() {
+ synchronized (this) {
+ updateFlushPointer(opSequenceNumber);
+ }
+ }
+
public void onCommit() {
}
@@ -662,10 +710,10 @@
private class QueueAddOperation extends OperationBase {
- private AsciiBuffer queue;
+ private QueueStore.QueueDescriptor qd;
- QueueAddOperation(AsciiBuffer queue) {
- this.queue = queue;
+ QueueAddOperation(QueueStore.QueueDescriptor queue) {
+ qd = queue;
}
@Override
@@ -676,7 +724,11 @@
@Override
protected void doExcecute(Session session) {
- session.queueAdd(queue);
+ try {
+ session.queueAdd(qd);
+ } catch (KeyNotFoundException e) {
+ throw new FatalStoreException(e);
+ }
}
@Override
@@ -685,16 +737,16 @@
}
public String toString() {
- return "QueueAdd: " + queue.toString();
+ return "QueueAdd: " + qd.getQueueName().toString();
}
}
private class QueueDeleteOperation extends OperationBase {
- private AsciiBuffer queue;
+ private QueueStore.QueueDescriptor qd;
- QueueDeleteOperation(AsciiBuffer queue) {
- this.queue = queue;
+ QueueDeleteOperation(QueueStore.QueueDescriptor queue) {
+ qd = queue;
}
@Override
@@ -705,7 +757,7 @@
@Override
protected void doExcecute(Session session) {
- session.queueRemove(queue);
+ session.queueRemove(qd);
}
@Override
@@ -714,23 +766,23 @@
}
public String toString() {
- return "QueueDelete: " + queue.toString();
+ return "QueueDelete: " + qd.getQueueName().toString();
}
}
private class DeleteMessageOperation extends OperationBase {
private final long storeTracking;
- private AsciiBuffer queue;
+ private QueueStore.QueueDescriptor queue;
- public DeleteMessageOperation(MessageDelivery delivery, AsciiBuffer queue) {
- this.storeTracking = delivery.getStoreTracking();
+ public DeleteMessageOperation(long tracking, QueueStore.QueueDescriptor queue) {
+ this.storeTracking = tracking;
this.queue = queue;
}
@Override
public int getLimiterSize() {
// Might consider bumping this up to avoid too much accumulation?
- return 0;
+ return 1;
}
@Override
@@ -757,16 +809,18 @@
}
private class RestoreMessageOperation extends OperationBase {
- private AsciiBuffer queue;
+ private QueueStore.QueueDescriptor queue;
private long firstKey;
private int maxRecords;
- private MessageRestoreListener listener;
- private Collection<RestoredMessage> msgs = null;
+ private long maxSequence;
+ private RestoreListener<MessageDelivery> listener;
+ private Collection<RestoredElement<MessageDelivery>> msgs = null;
- RestoreMessageOperation(AsciiBuffer queue, long firstKey, int maxRecords, MessageRestoreListener listener) {
+ RestoreMessageOperation(QueueStore.QueueDescriptor queue, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
this.queue = queue;
this.firstKey = firstKey;
this.maxRecords = maxRecords;
+ this.maxSequence = maxSequence;
this.listener = listener;
}
@@ -775,17 +829,47 @@
Iterator<QueueRecord> records = null;
try {
- records = session.queueListMessagesQueue(queue, firstKey, maxRecords);
-
+ records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
+ msgs = new LinkedList<RestoredElement<MessageDelivery>>();
} catch (KeyNotFoundException e) {
- msgs = new ArrayList<RestoredMessage>(0);
+ msgs = new ArrayList<RestoredElement<MessageDelivery>>(0);
return;
}
- while (records.hasNext()) {
+ QueueRecord qRecord = null;
+ int count = 0;
+ if (records.hasNext()) {
+ qRecord = records.next();
+ }
+
+ while (qRecord != null) {
RestoredMessageImpl rm = new RestoredMessageImpl();
// TODO should update jms redelivery here.
- rm.qRecord = records.next();
+ rm.qRecord = qRecord;
+ count++;
+
+ // Set the next sequence number:
+ if (records.hasNext()) {
+ qRecord = records.next();
+ rm.nextSequence = qRecord.getQueueKey();
+ } else {
+ // Look up the next sequence number:
+ try {
+ records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1);
+ if(!records.hasNext())
+ {
+ rm.nextSequence = -1;
+ }
+ else
+ {
+ rm.nextSequence = records.next().queueKey;
+ }
+ } catch (KeyNotFoundException e) {
+ rm.nextSequence = -1;
+ }
+ qRecord = null;
+ }
+
try {
rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
@@ -797,14 +881,19 @@
throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
}
}
+ msgs.add(rm);
} catch (KeyNotFoundException shouldNotHappen) {
+ shouldNotHappen.printStackTrace();
}
}
+
+ if(DEBUG)
+ System.out.println("Restored: " + count + " messages");
}
@Override
public void onCommit() {
- listener.messagesRestored(msgs);
+ listener.elementsRestored(msgs);
}
public String toString() {
@@ -817,7 +906,8 @@
private final BrokerMessageDelivery brokerDelivery;
private final MessageDelivery delivery;
- private final AsciiBuffer target;
+ private final long queueSequence;
+ private final QueueStore.QueueDescriptor target;
private MessageRecord record;
private final boolean delayable;
@@ -825,6 +915,7 @@
public AddMessageOperation(BrokerMessageDelivery delivery) throws IOException {
this.brokerDelivery = delivery;
this.delivery = delivery;
+ this.queueSequence = -1;
target = null;
this.delayable = delivery.isFlushDelayable();
if (!delayable) {
@@ -832,10 +923,11 @@
}
}
- public AddMessageOperation(MessageDelivery delivery, AsciiBuffer target) throws IOException {
+ public AddMessageOperation(MessageDelivery delivery, QueueStore.QueueDescriptor target, long queueSequence) throws IOException {
this.brokerDelivery = null;
this.delivery = delivery;
this.target = target;
+ this.queueSequence = queueSequence;
this.record = delivery.createMessageRecord();
delayable = false;
}
@@ -854,7 +946,7 @@
if (target == null) {
brokerDelivery.beginStore();
- Collection<AsciiBuffer> targets = brokerDelivery.getPersistentQueues();
+ Set<Entry<QueueDescriptor, Long>> targets = brokerDelivery.getPersistentQueues();
if (!targets.isEmpty()) {
if (record == null) {
@@ -867,12 +959,14 @@
record.setKey(delivery.getStoreTracking());
session.messageAdd(record);
- for (AsciiBuffer target : brokerDelivery.getPersistentQueues()) {
+ for (Entry<QueueDescriptor, Long> target : brokerDelivery.getPersistentQueues()) {
try {
QueueRecord queueRecord = new QueueRecord();
queueRecord.setAttachment(null);
queueRecord.setMessageKey(record.getKey());
- session.queueAddMessage(target, queueRecord);
+ queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+ queueRecord.setQueueKey(target.getValue());
+ session.queueAddMessage(target.getKey(), queueRecord);
} catch (KeyNotFoundException e) {
e.printStackTrace();
@@ -890,6 +984,8 @@
QueueRecord queueRecord = new QueueRecord();
queueRecord.setAttachment(null);
queueRecord.setMessageKey(record.getKey());
+ queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+ queueRecord.setQueueKey(queueSequence);
session.queueAddMessage(target, queueRecord);
} catch (KeyNotFoundException e) {
e.printStackTrace();
@@ -907,18 +1003,35 @@
}
}
- private class RestoredMessageImpl implements RestoredMessage {
+ private class RestoredMessageImpl implements RestoredElement<MessageDelivery> {
QueueRecord qRecord;
MessageRecord mRecord;
ProtocolHandler handler;
+ long nextSequence = -1;
+
+ public MessageDelivery getElement() throws IOException {
+ BrokerMessageDelivery delivery = handler.createMessageDelivery(mRecord);
+ delivery.setFromDatabase(BrokerDatabase.this, mRecord);
+ return delivery;
+ }
+
+ public long getSequenceNumber() {
+ return qRecord.getQueueKey();
+ }
+
+ public long getStoreTracking() {
+ return qRecord.getMessageKey();
+ }
- public MessageDelivery getMessageDelivery() throws IOException {
- return handler.createMessageDelivery(mRecord);
+ public long getNextSequenceNumber() {
+ return nextSequence;
}
+
}
public long allocateStoreTracking() {
// TODO Auto-generated method stub
return store.allocateStoreTracking();
}
+
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java Mon Apr 27 18:40:44 2009
@@ -1,124 +0,0 @@
-package org.apache.activemq.broker.store;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.QueueStoreHelper;
-import org.apache.activemq.queue.SingleFlowRelay;
-import org.apache.activemq.broker.MessageDelivery;
-import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.flow.Flow;
-import org.apache.activemq.flow.IFlowController;
-import org.apache.activemq.flow.IFlowRelay;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.flow.ISinkController;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.flow.ISinkController.FlowUnblockListener;
-
-
-public class MessageDeliveryStoreHelper implements QueueStoreHelper<MessageDelivery>, Dispatchable, BrokerDatabase.MessageRestoreListener {
-
- private final BrokerDatabase database;
- private final AsciiBuffer queue;
- private final DispatchContext dispatchContext;
- private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
- private final IFlowRelay<MessageDelivery> restoreRelay;
- private final SizeLimiter<MessageDelivery> restoreLimiter;
- private final IFlowController<MessageDelivery> controller;
- private final FlowUnblockListener<MessageDelivery> unblockListener;
- private final IFlowSink<MessageDelivery> targetSink;
-
- private int RESTORE_BATCH_SIZE = 50;
-
- private AtomicBoolean started = new AtomicBoolean(false);
- private AtomicBoolean restoreComplete = new AtomicBoolean(false);
- private AtomicBoolean storeLoaded = new AtomicBoolean(false);
-
- private static enum State {
- STOPPED, RESTORING, RESTORED
- };
-
- private State state = State.RESTORING;
-
- MessageDeliveryStoreHelper(BrokerDatabase database, AsciiBuffer queueName, IFlowSink<MessageDelivery> sink, IDispatcher dispatcher) {
- this.database = database;
- this.queue = queueName;
- this.targetSink = sink;
- Flow flow = new Flow("MessageRestorer-" + queue, false);
- restoreLimiter = new SizeLimiter<MessageDelivery>(1000, 500) {
- @Override
- public int getElementSize(MessageDelivery msg) {
- return msg.getFlowLimiterSize();
- }
- };
- restoreRelay = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), restoreLimiter);
- controller = restoreRelay.getFlowController(flow);
- dispatchContext = dispatcher.register(this, flow.getFlowName());
-
- unblockListener = new FlowUnblockListener<MessageDelivery>() {
- public void onFlowUnblocked(ISinkController<MessageDelivery> controller) {
- dispatchContext.requestDispatch();
- }
- };
- }
-
- public void delete(MessageDelivery elem, boolean flush) {
- elem.delete(queue);
- }
-
- public void save(MessageDelivery elem, boolean flush) throws IOException {
- elem.persist(queue, !flush);
- }
-
- public boolean hasStoredElements() {
- return !restoreComplete.get();
- }
-
- public void startLoadingQueue() {
- database.restoreMessages(queue, 0, RESTORE_BATCH_SIZE, this);
- }
-
- public void stopLoadingQueue() {
- // TODO Auto-generated method stub
- }
-
- public boolean dispatch() {
-
- RestoredMessage restored = restoredMsgs.poll();
- if (restored == null || restoreComplete.get()) {
- return true;
- }
-
- if (controller.isSinkBlocked()) {
- if (controller.addUnblockListener(unblockListener)) {
- return true;
- }
- } else {
- try {
- targetSink.add(restored.getMessageDelivery(), controller);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- return false;
- }
-
- public void messagesRestored(Collection<RestoredMessage> msgs) {
- if (!msgs.isEmpty()) {
- restoredMsgs.addAll(msgs);
- } else {
- storeLoaded.set(true);
- }
- dispatchContext.requestDispatch();
-
- dispatchContext.requestDispatch();
- }
-}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Mon Apr 27 18:40:44 2009
@@ -17,11 +17,14 @@
package org.apache.activemq.broker.store;
import java.io.File;
+import java.util.Collection;
import java.util.Iterator;
import org.apache.activemq.Service;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
/**
* Interface to persistently store and access data needed by the messaging
@@ -163,6 +166,33 @@
Long queueKey;
Long messageKey;
Buffer attachment;
+ int size;
+ boolean redelivered;
+ long tte;
+
+ public boolean isRedelivered() {
+ return redelivered;
+ }
+
+ public void setRedelivered(boolean redelivered) {
+ this.redelivered = redelivered;
+ }
+
+ public long getTte() {
+ return tte;
+ }
+
+ public void setTte(long tte) {
+ this.tte = tte;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
public Long getQueueKey() {
return queueKey;
@@ -194,6 +224,16 @@
Long key = (long) -1;
AsciiBuffer messageId;
AsciiBuffer encoding;
+ int size;
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
Buffer buffer;
Long streamKey;
@@ -239,6 +279,48 @@
}
/**
+ * Result Holder for queue related queries.
+ */
+ public interface QueueQueryResult {
+
+ /**
+ * @return the descriptor for the queue.
+ */
+ public QueueStore.QueueDescriptor getDescriptor();
+
+ /**
+ * Gets the count of elements in this queue. Note that this does not
+ * include counts for elements held in child partitions.
+ *
+ * @return the number of elements in the queue.
+ */
+ public int getCount();
+
+ /**
+ * Gets the size of elements in this queue. Note that this does not
+ * include size of elements held in child partitions.
+ *
+ * @return the total size of elements in the queue
+ */
+ public long getSize();
+
+ /**
+ * @return the first sequence number in the queue.
+ */
+ public long getFirstSequence();
+
+ /**
+ * @return the last sequence number in the queue.
+ */
+ public long getLastSequence();
+
+ /**
+ * @return The results for this queue's partitions
+ */
+ public Collection<QueueQueryResult> getPartitions();
+ }
+
+ /**
* Executes user supplied {@link Callback}. If the {@link Callback} does not
* throw any Exceptions, all updates to the store are committed to the store
* as a single unit of work, otherwise they are rolled back.
@@ -297,24 +379,81 @@
public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
- public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+ public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
public void transactionCommit(Buffer txid) throws KeyNotFoundException;
public void transactionRollback(Buffer txid) throws KeyNotFoundException;
- // Queue related methods.
- public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
+ /**
+ * Gets a list of queues. The returned iterator returns top-level queues
+ * (e.g. queues without a parent). The child queues are accessible via
+ * {@link QueueQueryResult#getPartitions()}.
+ *
+ * @param firstQueueName
+ * If null starts the query at the first queue.
+ * @param max
+ * The maximum number of queues to return
+ * @return The list of queues.
+ */
+ public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueueName, int max);
- public void queueAdd(AsciiBuffer queueName);
+ /**
+ * Gets a list of queues for which
+ * {@link QueueDescriptor#getQueueType()} matches the specified type.
+ * The returned iterator returns top-level queues (e.g. queues without a
+ * parent). The child queues are accessible via
+ * {@link QueueQueryResult#getPartitions()}.
+ *
+ * @param firstQueueName
+ * If null starts the query at the first queue.
+ * @param max
+ * The maximum number of queues to return
+ * @param type
+ * The type of queue to consider
+ * @return The list of queues.
+ */
+ public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueueName, int max);
- public void queueRemove(AsciiBuffer queueName);
+ /**
+ * Adds a queue. If {@link QueueDescriptor#getParent()} is specified
+ * then the parent queue must exist.
+ *
+ * @param queue
+ * The queue to add.
+ *
+ * @throws KeyNotFoundException
+ * if the descriptor specifies a non existent parent
+ */
+ public void queueAdd(QueueStore.QueueDescriptor queue) throws KeyNotFoundException;
- public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
+ /**
+ * Deletes a queue and all of it's messages. If it has any child
+ * partitions they are deleted as well.
+ *
+ * @param queue
+ * The queue to delete
+ */
+ public void queueRemove(QueueStore.QueueDescriptor queue);
+
+ /**
+ * Adds a reference to the message for the given queue. The associated
+ * queue record contains the sequence number of the message in this
+ * queue and the store tracking number of the associated message.
+ *
+ * @param queue
+ * The queue descriptor
+ * @param record
+ * The queue record
+ * @throws KeyNotFoundException
+ * If there is no message associated with
+ * {@link QueueRecord#getMessageKey()}
+ */
+ public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
- public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException;
+ public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException;
+ public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Mon Apr 27 18:40:44 2009
@@ -20,21 +20,28 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.activemq.queue.QueueStore;
import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
public class DestinationEntity {
-
- public final static Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
-
+
+ public final static Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
+
public Class<DestinationEntity> getType() {
return DestinationEntity.class;
}
@@ -43,103 +50,229 @@
DestinationEntity value = new DestinationEntity();
value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
value.trackingIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
+ value.descriptor = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
+ value.metaData = new Page<DestinationMetaData>(dataIn.readLong());
return value;
}
public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
dataOut.writeLong(value.queueIndex.getPageId());
dataOut.writeLong(value.trackingIndex.getPageId());
+ Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.descriptor, dataOut);
+ dataOut.writeLong(value.metaData.getPageId());
}
};
- private long nextQueueKey;
+ public final static Marshaller<DestinationMetaData> META_DATA_MARSHALLER = new Marshaller<DestinationMetaData>() {
+
+ public Class<DestinationMetaData> getType() {
+ return DestinationMetaData.class;
+ }
+
+ public DestinationMetaData readPayload(DataInput dataIn) throws IOException {
+ DestinationMetaData value = new DestinationMetaData();
+ value.count = dataIn.readInt();
+ value.size = dataIn.readLong();
+ return value;
+ }
+
+ public void writePayload(DestinationMetaData value, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(value.count);
+ dataOut.writeLong(value.size);
+ }
+ };
+
+ public Class<DestinationEntity> getType() {
+ return DestinationEntity.class;
+ }
+
private BTreeIndex<Long, QueueRecord> queueIndex;
private BTreeIndex<Long, Long> trackingIndex;
-
- ///////////////////////////////////////////////////////////////////
+
+ // Descriptor for this queue:
+ private QueueStore.QueueDescriptor descriptor;
+
+ // Child Partitions:
+ private HashSet<DestinationEntity> partitions;
+
+ // Holds volatile queue meta data
+ private Page<DestinationMetaData> metaData;
+
+ // /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public void allocate(Transaction tx) throws IOException {
queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
trackingIndex = new BTreeIndex<Long, Long>(tx.allocate());
+ metaData = tx.allocate();
+ metaData.set(new DestinationMetaData());
+ tx.store(metaData, META_DATA_MARSHALLER, true);
}
-
+
public void deallocate(Transaction tx) throws IOException {
queueIndex.clear(tx);
trackingIndex.clear(tx);
tx.free(trackingIndex.getPageId());
tx.free(queueIndex.getPageId());
- queueIndex=null;
- trackingIndex=null;
+ tx.free(metaData.getPageId());
+ queueIndex = null;
+ trackingIndex = null;
+ metaData = null;
}
-
+
public void load(Transaction tx) throws IOException {
- if( queueIndex.getPageFile()==null ) {
-
+ if (queueIndex.getPageFile() == null) {
+
queueIndex.setPageFile(tx.getPageFile());
queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
queueIndex.load(tx);
-
- // Figure out the next key using the last entry in the destination.
- Entry<Long, QueueRecord> lastEntry = queueIndex.getLast(tx);
- if( lastEntry!=null ) {
- nextQueueKey = lastEntry.getKey()+1;
- }
}
-
- if( trackingIndex.getPageFile()==null ) {
-
+
+ if (trackingIndex.getPageFile() == null) {
+
trackingIndex.setPageFile(tx.getPageFile());
trackingIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
trackingIndex.setValueMarshaller(LongMarshaller.INSTANCE);
trackingIndex.load(tx);
}
+
+ tx.load(metaData, META_DATA_MARSHALLER);
}
-
- ///////////////////////////////////////////////////////////////////
- // Message Methods.
- ///////////////////////////////////////////////////////////////////
- public Long nextQueueKey() {
- return nextQueueKey++;
+
+ private static final boolean unlimited(Number val) {
+ return val == null || val.longValue() < 0;
}
+
+ private DestinationMetaData getMetaData(Transaction tx) throws IOException {
+ tx.load(metaData, META_DATA_MARSHALLER);
+ return metaData.get();
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Message Methods.
+ // /////////////////////////////////////////////////////////////////
- public void add(Transaction tx, QueueAddMessage command) throws IOException {
- QueueRecord value = new QueueRecord();
- value.setAttachment(command.getAttachment());
- value.setMessageKey(command.getMessageKey());
- value.setQueueKey(command.getQueueKey());
- queueIndex.put(tx, value.getQueueKey(), value);
- trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+ public long getSize(Transaction tx) throws IOException {
+ return getMetaData(tx).size;
+ }
+
+ public int getCount(Transaction tx) throws IOException {
+ return getMetaData(tx).count;
+ }
+
+ public long getFirstSequence(Transaction tx) throws IOException {
+ return getMetaData(tx).count == 0 ? 0 : queueIndex.getFirst(tx).getValue().getQueueKey();
+ }
+
+ public long getLastSequence(Transaction tx) throws IOException {
+ return getMetaData(tx).count == 0 ? 0 : queueIndex.getLast(tx).getValue().getQueueKey();
+ }
+
+ public void setQueueDescriptor(QueueStore.QueueDescriptor queue) {
+ descriptor = queue;
+ }
+
+ public QueueStore.QueueDescriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public void addPartition(DestinationEntity destination) {
+ if (partitions == null) {
+ partitions = new HashSet<DestinationEntity>();
+ }
+
+ partitions.add(destination);
+ }
+
+ public void removePartition(DestinationEntity queue) {
+ if (partitions == null) {
+ return;
+ }
+
+ partitions.remove(queue);
+ if (partitions.isEmpty()) {
+ partitions = null;
+ }
+ }
+
+ public Iterator<DestinationEntity> getPartitions() {
+ if (partitions == null) {
+ return null;
+ } else {
+ return partitions.iterator();
+ }
+ }
+
+ public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
+
+ Long existing = trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+ if (existing == null) {
+ QueueRecord value = new QueueRecord();
+ value.setAttachment(command.getAttachment());
+ value.setMessageKey(command.getMessageKey());
+ value.setQueueKey(command.getQueueKey());
+ value.setSize(command.getMessageSize());
+
+ QueueRecord rc = queueIndex.put(tx, value.getQueueKey(), value);
+ if (rc == null) {
+ // TODO It seems a little inefficient to continually serialize
+ // the queue size. It might be better to update this only at
+ // commit
+ // timeespecially if we start doing multiple adds per
+ // transaction.
+ // It is also possible that we might want to remove this update
+ // altogether in favor of scanning the whole queue at recovery
+ // time (at the cost of startup time)
+ getMetaData(tx).update(1, command.getMessageSize());
+ tx.store(metaData, META_DATA_MARSHALLER, true);
+ } else {
+ throw new Store.FatalStoreException(new Store.DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + descriptor.getQueueName()));
+ }
+ } else {
+ throw new Store.DuplicateKeyException("Duplicate tracking " + command.getMessageKey() + " for " + descriptor.getQueueName());
+ }
}
public boolean remove(Transaction tx, long msgKey) throws IOException {
Long queueKey = trackingIndex.remove(tx, msgKey);
- if(queueKey != null)
- {
- queueIndex.remove(tx, queueKey);
+ if (queueKey != null) {
+ QueueRecord qr = queueIndex.remove(tx, queueKey);
+ getMetaData(tx).update(-1, -qr.getSize());
+ tx.store(metaData, META_DATA_MARSHALLER, true);
return true;
}
return false;
}
- public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final int max) throws IOException {
- final ArrayList<QueueRecord> rc = new ArrayList<QueueRecord>(max);
+ public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {
+ Collection<QueueRecord> rc;
+ if (unlimited(max)) {
+ rc = new LinkedList<QueueRecord>();
+ } else {
+ rc = new ArrayList<QueueRecord>(max);
+ }
Iterator<Entry<Long, QueueRecord>> iterator;
- if( firstQueueKey!=null ) {
- iterator = queueIndex.iterator(tx, firstQueueKey);
- } else {
+ if (unlimited(firstQueueKey)) {
iterator = queueIndex.iterator(tx);
+
+ } else {
+ iterator = queueIndex.iterator(tx, firstQueueKey);
}
+ boolean sequenceLimited = !unlimited(maxQueueKey);
+ boolean countLimited = !unlimited(max);
while (iterator.hasNext()) {
- if( rc.size() >= max ) {
+ if (countLimited && rc.size() >= max) {
break;
}
Map.Entry<Long, QueueRecord> entry = iterator.next();
+ if (sequenceLimited && entry.getValue().getQueueKey() > maxQueueKey) {
+ break;
+ }
rc.add(entry.getValue());
}
-
+
return rc.iterator();
}
@@ -147,5 +280,19 @@
return trackingIndex.iterator(tx);
}
+ public static class DestinationMetaData {
+ int count;
+ long size;
+
+ public void update(int count, long size) {
+ this.count += count;
+ this.size += size;
+ }
+
+ public void set(int count, long size) {
+ this.count = count;
+ this.size = size;
+ }
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Apr 27 18:40:44 2009
@@ -49,6 +49,8 @@
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.activemq.protobuf.PBMessage;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.Journal;
@@ -102,6 +104,7 @@
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+ private boolean recovering;
private static class UoWOperation {
public TypeCreatable bean;
@@ -114,7 +117,14 @@
// /////////////////////////////////////////////////////////////////
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
- load();
+ try
+ {
+ load();
+ }
+ catch (Exception e)
+ {
+ LOG.error("Error loading store", e);
+ }
}
}
@@ -216,6 +226,7 @@
};
checkpointThread.start();
recover();
+ trackingGen.set(rootEntity.getLastMessageTracking() + 1);
}
}
@@ -267,6 +278,11 @@
rootEntity.setState(CLOSED_STATE);
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
+ // Set the last update to the next update (otherwise
+ // we'll replay the last update
+ // since location marshaller doesn't marshal the
+ // location's size:
+ rootEntity.setLastUpdate(journal.getNextLocation(rootEntity.getLastUpdate()));
rootEntity.store(tx);
}
});
@@ -294,7 +310,7 @@
indexLock.writeLock().lock();
try {
long start = System.currentTimeMillis();
-
+ recovering = true;
ArrayList<UoWOperation> uow = null;
Location recoveryPosition = getRecoveryPosition();
if (recoveryPosition != null) {
@@ -352,6 +368,7 @@
}
});
} finally {
+ recovering = false;
indexLock.writeLock().unlock();
}
}
@@ -359,6 +376,7 @@
public void incrementalRecover() throws IOException {
indexLock.writeLock().lock();
try {
+ recovering = true;
if (nextRecoveryPosition == null) {
if (lastRecoveryPosition == null) {
nextRecoveryPosition = getRecoveryPosition();
@@ -381,6 +399,7 @@
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
} finally {
+ recovering = false;
indexLock.writeLock().unlock();
}
}
@@ -492,6 +511,10 @@
}
rootEntity.setState(OPEN_STATE);
+ // Set the last update to the next update (otherwise we'll replay the
+ // last update
+ // since location marshaller doesn't marshal the location's size:
+ rootEntity.setLastUpdate(journal.getNextLocation(rootEntity.getLastUpdate()));
rootEntity.store(tx);
pageFile.flush();
@@ -633,6 +656,7 @@
}
});
rootEntity.setLastUpdate(location);
+
} finally {
indexLock.writeLock().unlock();
}
@@ -736,6 +760,8 @@
@SuppressWarnings("unchecked")
public void updateIndex(Transaction tx, Type type, MessageBuffer command, Location location) throws IOException {
+ // System.out.println("Updating index" + type.toString() + " loc: " +
+ // location);
switch (type) {
case MESSAGE_ADD:
messageAdd(tx, (MessageAdd) command, location);
@@ -775,23 +801,44 @@
}
private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
- rootEntity.queueAdd(tx, command.getQueueName());
+ QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+ qd.setQueueName(command.getQueueName());
+ qd.setApplicationType((short) command.getApplicationType());
+ qd.setQueueType((short) command.getQueueType());
+ if (command.hasParentName()) {
+ qd.setParent(command.getParentName());
+ qd.setPartitionId(command.getPartitionId());
+ }
+
+ rootEntity.queueAdd(tx, qd);
}
private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
- rootEntity.queueRemove(tx, command.getQueueName());
+ QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+ qd.setQueueName(command.getQueueName());
+ rootEntity.queueRemove(tx, qd);
}
private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
- DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+ QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+ qd.setQueueName(command.getQueueName());
+ DestinationEntity destination = rootEntity.getDestination(qd);
if (destination != null) {
- destination.add(tx, command);
+ try {
+ destination.add(tx, command);
+ } catch (DuplicateKeyException e) {
+ if (!recovering) {
+ throw new FatalStoreException(e);
+ }
+ }
rootEntity.addMessageRef(tx, command.getQueueName(), command.getMessageKey());
}
}
private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
- DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+ QueueStore.QueueDescriptor qd = new QueueStore.QueueDescriptor();
+ qd.setQueueName(command.getQueueName());
+ DestinationEntity destination = rootEntity.getDestination(qd);
if (destination != null) {
if (destination.remove(tx, command.getMessageKey())) {
rootEntity.removeMessageRef(tx, command.getQueueName(), command.getMessageKey());
@@ -851,6 +898,7 @@
// /////////////////////////////////////////////////////////////
// Message related methods.
// /////////////////////////////////////////////////////////////
+
public void messageAdd(MessageRecord message) {
if (message.getKey() < 0) {
throw new IllegalArgumentException("Key not set");
@@ -859,6 +907,7 @@
bean.setMessageKey(message.getKey());
bean.setMessageId(message.getMessageId());
bean.setEncoding(message.getEncoding());
+ bean.setMessageSize(message.getSize());
Buffer buffer = message.getBuffer();
if (buffer != null) {
bean.setBuffer(buffer);
@@ -881,6 +930,7 @@
rc.setKey(bean.getMessageKey());
rc.setMessageId(bean.getMessageId());
rc.setEncoding(bean.getEncoding());
+ rc.setSize(bean.getMessageSize());
if (bean.hasBuffer()) {
rc.setBuffer(bean.getBuffer());
}
@@ -896,49 +946,65 @@
// /////////////////////////////////////////////////////////////
// Queue related methods.
// /////////////////////////////////////////////////////////////
- public void queueAdd(AsciiBuffer queueName) {
- updates.add(new QueueAddBean().setQueueName(queueName));
+ public void queueAdd(QueueStore.QueueDescriptor descriptor) {
+ QueueAddBean update = new QueueAddBean();
+ update.setQueueName(descriptor.getQueueName());
+ update.setQueueType(descriptor.getQueueType());
+ update.setApplicationType(descriptor.getApplicationType());
+ AsciiBuffer parent = descriptor.getParent();
+ if (parent != null) {
+ update.setParentName(parent);
+ update.setPartitionId(descriptor.getPartitionKey());
+ }
+ updates.add(update);
}
- public void queueRemove(AsciiBuffer queueName) {
- updates.add(new QueueRemoveBean().setQueueName(queueName));
+ public void queueRemove(QueueStore.QueueDescriptor descriptor) {
+ updates.add(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
}
- public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
- return rootEntity.queueList(tx(), firstQueueName, max);
+ public Iterator<QueueQueryResult> queueListByType(short type, QueueStore.QueueDescriptor firstQueue, int max) {
+ try {
+ return rootEntity.queueList(tx(), type, firstQueue, max);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
}
- public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
- DestinationEntity destination = rootEntity.getDestination(queueName);
- if (destination == null) {
- throw new KeyNotFoundException("queue key: " + queueName);
+ public Iterator<QueueQueryResult> queueList(QueueStore.QueueDescriptor firstQueue, int max) {
+ try {
+ return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
}
- Long queueKey = destination.nextQueueKey();
+ }
+
+ public void queueAddMessage(QueueStore.QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException {
QueueAddMessageBean bean = new QueueAddMessageBean();
- bean.setQueueName(queueName);
- bean.setQueueKey(queueKey);
+ bean.setQueueName(queue.getQueueName());
+ bean.setQueueKey(record.getQueueKey());
bean.setMessageKey(record.getMessageKey());
+ bean.setMessageSize(record.getSize());
if (record.getAttachment() != null) {
bean.setAttachment(record.getAttachment());
}
updates.add(bean);
- return queueKey;
}
- public void queueRemoveMessage(AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+ public void queueRemoveMessage(QueueStore.QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
bean.setMessageKey(messageKey);
- bean.setQueueName(queueName);
+ bean.setQueueName(queue.getQueueName());
updates.add(bean);
}
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
- DestinationEntity destination = rootEntity.getDestination(queueName);
+ public Iterator<QueueRecord> queueListMessagesQueue(QueueStore.QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+ DestinationEntity destination = rootEntity.getDestination(queue);
if (destination == null) {
- throw new KeyNotFoundException("queue key: " + queueName);
+ throw new KeyNotFoundException("queue key: " + queue);
}
try {
- return destination.listMessages(tx(), firstQueueKey, max);
+ return destination.listMessages(tx(), firstQueueKey, maxQueueKey, max);
} catch (IOException e) {
throw new FatalStoreException(e);
}
@@ -1012,7 +1078,7 @@
return null;
}
- public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
+ public void transactionRemoveMessage(Buffer txid, QueueStore.QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException {
}
public void transactionRollback(Buffer txid) throws KeyNotFoundException {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Mon Apr 27 18:40:44 2009
@@ -23,31 +23,45 @@
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.Marshaller;
public class Marshallers {
-
+
public final static Marshaller<QueueRecord> QUEUE_RECORD_MARSHALLER = new Marshaller<QueueRecord>() {
-
+
public Class<QueueRecord> getType() {
return QueueRecord.class;
}
-
+
public QueueRecord readPayload(DataInput dataIn) throws IOException {
QueueRecord rc = new QueueRecord();
rc.setQueueKey(dataIn.readLong());
rc.setMessageKey(dataIn.readLong());
- if( dataIn.readBoolean() ) {
+ rc.setSize(dataIn.readInt());
+ if (dataIn.readBoolean()) {
+ rc.setTte(dataIn.readLong());
+ }
+ rc.setRedelivered(dataIn.readBoolean());
+ if (dataIn.readBoolean()) {
rc.setAttachment(BUFFER_MARSHALLER.readPayload(dataIn));
}
return rc;
}
-
+
public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
dataOut.writeLong(object.getQueueKey());
dataOut.writeLong(object.getMessageKey());
- if( object.getAttachment()!=null ) {
+ dataOut.writeInt(object.getSize());
+ if (object.getTte() >= 0) {
+ dataOut.writeBoolean(true);
+ dataOut.writeLong(object.getTte());
+ } else {
+ dataOut.writeBoolean(false);
+ }
+ dataOut.writeBoolean(object.isRedelivered());
+ if (object.getAttachment() != null) {
dataOut.writeBoolean(true);
BUFFER_MARSHALLER.writePayload(object.getAttachment(), dataOut);
} else {
@@ -57,58 +71,89 @@
};
public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
-
+
public Class<Location> getType() {
return Location.class;
}
-
+
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());
rc.setOffset(dataIn.readInt());
return rc;
}
-
+
public void writePayload(Location object, DataOutput dataOut) throws IOException {
dataOut.writeInt(object.getDataFileId());
dataOut.writeInt(object.getOffset());
}
};
-
-
+
public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new Marshaller<AsciiBuffer>() {
-
+
public Class<AsciiBuffer> getType() {
return AsciiBuffer.class;
}
-
+
public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
byte data[] = new byte[dataIn.readShort()];
dataIn.readFully(data);
return new AsciiBuffer(data);
}
-
+
public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
dataOut.writeShort(object.length);
dataOut.write(object.data, object.offset, object.length);
}
};
-
+
public final static Marshaller<Buffer> BUFFER_MARSHALLER = new Marshaller<Buffer>() {
-
+
public Class<Buffer> getType() {
return Buffer.class;
}
-
+
public Buffer readPayload(DataInput dataIn) throws IOException {
byte data[] = new byte[dataIn.readShort()];
dataIn.readFully(data);
return new Buffer(data);
}
-
+
public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
dataOut.writeShort(object.length);
dataOut.write(object.data, object.offset, object.length);
}
};
+
+ public final static Marshaller<QueueStore.QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new Marshaller<QueueStore.QueueDescriptor>() {
+
+ public Class<QueueStore.QueueDescriptor> getType() {
+ return QueueStore.QueueDescriptor.class;
+ }
+
+ public QueueStore.QueueDescriptor readPayload(DataInput dataIn) throws IOException {
+ QueueStore.QueueDescriptor descriptor = new QueueStore.QueueDescriptor();
+ descriptor.setQueueType(dataIn.readShort());
+ descriptor.setApplicationType(dataIn.readShort());
+ descriptor.setQueueName(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
+ if (dataIn.readBoolean()) {
+ descriptor.setParent(ASCII_BUFFER_MARSHALLER.readPayload(dataIn));
+ descriptor.setPartitionId(dataIn.readInt());
+ }
+ return descriptor;
+ }
+
+ public void writePayload(QueueStore.QueueDescriptor object, DataOutput dataOut) throws IOException {
+ dataOut.writeShort(object.getQueueType());
+ dataOut.writeShort(object.getApplicationType());
+ ASCII_BUFFER_MARSHALLER.writePayload(object.getQueueName(), dataOut);
+ if (object.getParent() != null) {
+ dataOut.writeBoolean(true);
+ ASCII_BUFFER_MARSHALLER.writePayload(object.getParent(), dataOut);
+ dataOut.writeInt(object.getPartitionKey());
+ } else {
+ dataOut.writeBoolean(false);
+ }
+ }
+ };
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Apr 27 18:40:44 2009
@@ -19,15 +19,18 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
-import java.util.Set;
+import java.util.LinkedList;
import java.util.TreeMap;
import java.util.Map.Entry;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
@@ -45,8 +48,9 @@
public RootEntity readPayload(DataInput is) throws IOException {
RootEntity rc = new RootEntity();
rc.state = is.readInt();
+ rc.maxMessageKey = is.readLong();
rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
- //rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
+ // rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
if (is.readBoolean()) {
@@ -59,8 +63,9 @@
public void writePayload(RootEntity object, DataOutput os) throws IOException {
os.writeInt(object.state);
+ os.writeLong(object.maxMessageKey);
os.writeLong(object.messageKeyIndex.getPageId());
- //os.writeLong(object.locationIndex.getPageId());
+ // os.writeLong(object.locationIndex.getPageId());
os.writeLong(object.destinationIndex.getPageId());
os.writeLong(object.messageRefsIndex.getPageId());
if (object.lastUpdate != null) {
@@ -79,12 +84,14 @@
private long pageId;
private int state;
private Location lastUpdate;
+ private boolean loaded;
// Message Indexes
- private long nextMessageKey;
+ private long maxMessageKey;
private BTreeIndex<Long, Location> messageKeyIndex;
- //private BTreeIndex<Location, Long> locationIndex;
- private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref count:
+ // private BTreeIndex<Location, Long> locationIndex;
+ private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
+ // count:
// The destinations
private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
@@ -103,7 +110,8 @@
state = KahaDBStore.CLOSED_STATE;
messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
- //locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(), tx.allocate().getPageId());
+ // locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(),
+ // tx.allocate().getPageId());
destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
@@ -116,11 +124,17 @@
messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
messageKeyIndex.setValueMarshaller(Marshallers.LOCATION_MARSHALLER);
messageKeyIndex.load(tx);
-
- //locationIndex.setPageFile(tx.getPageFile());
- //locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
- //locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- //locationIndex.load(tx);
+ // Update max message key:
+ Entry<Long, Location> last = messageKeyIndex.getLast(tx);
+ if (last != null) {
+ if (last.getKey() > maxMessageKey) {
+ maxMessageKey = last.getKey();
+ }
+ }
+ // locationIndex.setPageFile(tx.getPageFile());
+ // locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
+ // locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+ // locationIndex.load(tx);
destinationIndex.setPageFile(tx.getPageFile());
destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
@@ -137,7 +151,75 @@
for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
entry.getValue().load(tx);
- destinations.put(entry.getKey(), entry.getValue());
+ try {
+ addToDestinationCache(entry.getValue());
+ } catch (KeyNotFoundException e) {
+ //
+ }
+ }
+
+ // Build up the queue partition hierarchy:
+ try {
+ constructQueueHierarchy();
+ } catch (KeyNotFoundException e) {
+ throw new IOException("Inconsistent store", e);
+ }
+ }
+
+ /**
+ * Adds the destination to the destination cache
+ *
+ * @param entity
+ * The destination to cache.
+ * @throws KeyNotFoundException
+ * If the parent queue could not be found.
+ */
+ private void addToDestinationCache(DestinationEntity entity) throws KeyNotFoundException {
+ QueueStore.QueueDescriptor queue = entity.getDescriptor();
+
+ // If loaded add a reference to us from the parent:
+ if (loaded) {
+ if (queue.getParent() != null) {
+ DestinationEntity parent = destinations.get(queue.getParent());
+ if (parent == null) {
+ throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
+ }
+ parent.addPartition(entity);
+ }
+ }
+
+ destinations.put(queue.getQueueName(), entity);
+ }
+
+ private void removeFromDestinationCache(DestinationEntity entity) {
+ QueueStore.QueueDescriptor queue = entity.getDescriptor();
+
+ // If the queue is loaded remove the parent reference:
+ if (loaded) {
+ if (queue.getParent() != null) {
+ DestinationEntity parent = destinations.get(queue.getParent());
+ parent.removePartition(entity);
+ }
+ }
+ destinations.remove(queue.getQueueName());
+ }
+
+ /**
+ * Constructs the mapping of parent queues to child queues.
+ *
+ * @throws KeyNotFoundException
+ */
+ private void constructQueueHierarchy() throws KeyNotFoundException {
+ for (DestinationEntity destination : destinations.values()) {
+ QueueStore.QueueDescriptor queue = destination.getDescriptor();
+ if (queue.getParent() != null) {
+ DestinationEntity parent = destinations.get(queue.getParent());
+ if (parent == null) {
+ throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
+ } else {
+ parent.addPartition(destination);
+ }
+ }
}
}
@@ -150,12 +232,15 @@
// /////////////////////////////////////////////////////////////////
// Message Methods.
// /////////////////////////////////////////////////////////////////
- public Long nextMessageKey() {
- return nextMessageKey++;
+ public long getLastMessageTracking() {
+ return maxMessageKey;
}
public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
long id = command.getMessageKey();
+ if (id > maxMessageKey) {
+ maxMessageKey = id;
+ }
Location previous = messageKeyIndex.put(tx, id, location);
if (previous != null) {
// Message existed.. undo the index update we just did. Chances
@@ -163,13 +248,13 @@
messageKeyIndex.put(tx, id, previous);
}
}
-
+
public void messageRemove(Transaction tx, Long messageKey) throws IOException {
- //Location location = messageKeyIndex.remove(tx, messageKey);
+ // Location location = messageKeyIndex.remove(tx, messageKey);
messageKeyIndex.remove(tx, messageKey);
- //if (location != null) {
- // locationIndex.remove(tx, location);
- //}
+ // if (location != null) {
+ // locationIndex.remove(tx, location);
+ // }
}
public Location messageGetLocation(Transaction tx, Long messageKey) {
@@ -200,7 +285,7 @@
if (refs != null) {
if (refs.longValue() <= 1) {
messageRefsIndex.remove(tx, messageKey);
- //If this is the last record remove, the message
+ // If this is the last record remove, the message
messageRemove(tx, messageKey);
} else {
messageRefsIndex.put(tx, messageKey, new Long(refs.longValue() - 1));
@@ -214,51 +299,74 @@
// /////////////////////////////////////////////////////////////////
// Queue Methods.
// /////////////////////////////////////////////////////////////////
- public void queueAdd(Transaction tx, AsciiBuffer queueName) throws IOException {
- if (destinationIndex.get(tx, queueName) == null) {
+ public void queueAdd(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+ if (destinationIndex.get(tx, queue.getQueueName()) == null) {
DestinationEntity rc = new DestinationEntity();
+ rc.setQueueDescriptor(queue);
rc.allocate(tx);
- destinationIndex.put(tx, queueName, rc);
+ destinationIndex.put(tx, queue.getQueueName(), rc);
rc.load(tx);
- destinations.put(queueName, rc);
+ try {
+ addToDestinationCache(rc);
+ } catch (KeyNotFoundException e) {
+ throw new Store.FatalStoreException("Inconsistent QueueStore: " + e.getMessage(), e);
+ }
}
}
- public void queueRemove(Transaction tx, AsciiBuffer queueName) throws IOException {
- DestinationEntity destination = destinations.get(queueName);
+ public void queueRemove(Transaction tx, QueueStore.QueueDescriptor queue) throws IOException {
+ DestinationEntity destination = destinations.get(queue.getQueueName());
if (destination != null) {
- //Remove the message references.
- //TODO this should probably be optimized.
+ // Remove the message references.
+ // TODO this should probably be optimized.
Iterator<Entry<Long, Long>> messages = destination.listTrackingNums(tx);
- while(messages.hasNext())
- {
+ while (messages.hasNext()) {
Long messageKey = messages.next().getKey();
- removeMessageRef(tx, queueName, messageKey);
+ removeMessageRef(tx, queue.getQueueName(), messageKey);
}
- destinationIndex.remove(tx, queueName);
- destinations.remove(queueName);
+ destinationIndex.remove(tx, queue.getQueueName());
+ removeFromDestinationCache(destination);
destination.deallocate(tx);
}
}
- public DestinationEntity getDestination(AsciiBuffer queueName) {
- return destinations.get(queueName);
+ public DestinationEntity getDestination(QueueStore.QueueDescriptor queue) {
+ return destinations.get(queue.getQueueName());
}
- public Iterator<AsciiBuffer> queueList(Transaction tx, AsciiBuffer firstQueueName, int max) {
- return list(destinations, firstQueueName, max);
- }
+ public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueStore.QueueDescriptor firstQueue, int max) throws IOException {
+ LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
+ Collection<DestinationEntity> values = (firstQueue == null ? destinations.values() : destinations.tailMap(firstQueue.getQueueName()).values());
- static private <Key, Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
- ArrayList<Key> rc = new ArrayList<Key>(max);
- Set<Key> keys = (first == null ? map : map.tailMap(first)).keySet();
- for (Key buffer : keys) {
- if (rc.size() >= max) {
+ for (DestinationEntity de : values) {
+ if (results.size() >= max) {
break;
}
- rc.add(buffer);
+
+ if (type == -1 || de.getDescriptor().getApplicationType() == type) {
+ results.add(queryQueue(tx, de));
+ }
}
- return rc.iterator();
+ return results.iterator();
+ }
+
+ private final QueueQueryResult queryQueue(Transaction tx, DestinationEntity de) throws IOException {
+
+ QueueQueryResultImpl result = new QueueQueryResultImpl();
+ result.count = de.getCount(tx);
+ result.size = de.getSize(tx);
+ result.firstSequence = de.getFirstSequence(tx);
+ result.lastSequence = de.getLastSequence(tx);
+ result.desc = de.getDescriptor().copy();
+ Iterator<DestinationEntity> partitions = de.getPartitions();
+ if (partitions != null && partitions.hasNext()) {
+ result.partitions = new LinkedList<QueueQueryResult>();
+ while (partitions.hasNext()) {
+ result.partitions.add(queryQueue(tx, destinations.get(partitions.next().getDescriptor().getQueueName())));
+ }
+ }
+
+ return result;
}
public long getPageId() {
@@ -285,4 +393,37 @@
this.lastUpdate = lastUpdate;
}
+ private static class QueueQueryResultImpl implements QueueQueryResult {
+
+ QueueStore.QueueDescriptor desc;
+ Collection<QueueQueryResult> partitions;
+ long size;
+ int count;
+ long firstSequence;
+ long lastSequence;
+
+ public QueueStore.QueueDescriptor getDescriptor() {
+ return desc;
+ }
+
+ public Collection<QueueQueryResult> getPartitions() {
+ return partitions;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public long getFirstSequence() {
+ return firstSequence;
+ }
+
+ public long getLastSequence() {
+ return lastSequence;
+ }
+ }
}
\ No newline at end of file
|