Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 35160 invoked from network); 7 Jul 2010 03:41:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:41:25 -0000 Received: (qmail 32136 invoked by uid 500); 7 Jul 2010 03:41:25 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 32112 invoked by uid 500); 7 Jul 2010 03:41:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 32105 invoked by uid 99); 7 Jul 2010 03:41:25 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:41:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2584F23889ED; Wed, 7 Jul 2010 03:40:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961068 [3/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache... Date: Wed, 07 Jul 2010 03:40:20 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707034021.2584F23889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961068&view=auto ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (added) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:40:18 2010 @@ -0,0 +1,2000 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker; + +import _root_.java.util.{LinkedHashMap, ArrayList, HashMap} +import _root_.org.apache.activemq.broker.store.{Store} +import _root_.org.apache.activemq.Service +import _root_.java.lang.{String} +import _root_.org.apache.activemq.util.buffer.{AsciiBuffer} +import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue} +import _root_.scala.collection.JavaConversions._ +import _root_.scala.reflect.BeanProperty + +import path.{PathFilter} + +object VirtualHost extends Log + +class VirtualHost() extends Service with Logging { + + override protected def log = VirtualHost + + private val queueStore = new BrokerQueueStore() + private val queues = new HashMap[AsciiBuffer, Queue]() + private val durableSubs = new HashMap[String, DurableSubscription]() + private val q:DispatchQueue = ScalaDispatch.createQueue("virtual-host"); + val router = new Router(q) + + private var started = false; + + @BeanProperty + var broker: Broker = null + @BeanProperty + var names:List[String] = Nil; + def setNamesArray( names:ArrayList[String]) = { + this.names = names.toList + } + + @BeanProperty + var database:BrokerDatabase = null + @BeanProperty + var txnManager:TransactionManager = null + + + def start():Unit = { + if (started) { + return; + } + + database.start(); + +// router.setDatabase(database); + + //Recover queues: + queueStore.setDatabase(database); + queueStore.setDispatchQueue(q); + queueStore.loadQueues(); + + // Create Queue instances +// TODO: +// for (IQueue iQueue : queueStore.getSharedQueues()) { +// Queue queue = new Queue(iQueue); +// Domain domain = router.getDomain(Router.QUEUE_DOMAIN); +// Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName()); +// queue.setDestination(dest); +// domain.bind(dest.getName(), queue); +// queues.put(dest.getName(), queue); +// } +// for (Queue queue : queues.values()) { +// queue.start(); +// } + + //Recover transactions: + txnManager.loadTransactions(); + started = true; + } + + + + def stop():Unit = { + + if (!started) { + return; + } + +// TODO: +// val tmp = new ArrayList[Queue](queues.values()) +// for (queue <- tmp) { +// queue.shutdown +// } + +// TODO: +// ArrayList> durableQueues = new ArrayList>(queueStore.getDurableQueues()); +// done = new RunnableCountDownLatch(durableQueues.size()); +// for (IQueue queue : durableQueues) { +// queue.shutdown(done); +// } +// done.await(); + + database.stop(); + started = false; + } + + def createQueue(dest:Destination) :Queue = { + if (!started) { + //Queues from the store must be loaded before we can create new ones: + throw new IllegalStateException("Can't create queue on unstarted host"); + } + + val queue = queues.get(dest); +// TODO: +// // If the queue doesn't exist create it: +// if (queue == null) { +// IQueue iQueue = queueStore.createSharedQueue(dest.getName().toString()); +// queue = new Queue(iQueue); +// queue.setDestination(dest); +// Domain domain = router.getDomain(dest.getDomain()); +// domain.bind(dest.getName(), queue); +// queues.put(dest.getName(), queue); +// +// for (QueueLifecyleListener l : queueLifecyleListeners) { +// l.onCreate(queue); +// } +// } +// queue.start(); + queue; + } + + + def createSubscription(consumer:ConsumerContext):BrokerSubscription = { + createSubscription(consumer, consumer.getDestination()); + } + + def createSubscription(consumer:ConsumerContext, destination:Destination):BrokerSubscription = { + + // First handle composite destinations.. + var destinations = destination.getDestinations(); + if (destinations != null) { + var subs :List[BrokerSubscription] = Nil + for (childDest <- destinations) { + subs ::= createSubscription(consumer, childDest); + } + return new CompositeSubscription(destination, subs); + } + + // If it's a Topic... +// if ( destination.getDomain == TOPIC_DOMAIN || destination.getDomain == TEMP_TOPIC_DOMAIN ) { +// +// // It might be a durable subscription on the topic +// if (consumer.isDurable()) { +// var dsub = durableSubs.get(consumer.getSubscriptionName()); +// if (dsub == null) { +//// TODO: +//// IQueue queue = queueStore.createDurableQueue(consumer.getSubscriptionName()); +//// queue.start(); +//// dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue); +//// durableSubs.put(consumer.getSubscriptionName(), dsub); +// } +// return dsub; +// } +// +// // return a standard subscription +//// TODO: +//// return new TopicSubscription(this, destination, consumer.getSelectorExpression()); +// return null; +// } + + // It looks like a wild card subscription on a queue.. + if (PathFilter.containsWildCards(destination.getName())) { + return new WildcardQueueSubscription(this, destination, consumer); + } + + // It has to be a Queue subscription then.. + var queue = queues.get(destination.getName()); + if (queue == null) { + if (consumer.autoCreateDestination()) { + queue = createQueue(destination); + } else { + throw new IllegalStateException("The queue does not exist: " + destination.getName()); + } + } +// TODO: +// return new Queue.QueueSubscription(queue); + return null; + } + + + val queueLifecyleListeners = new ArrayList[QueueLifecyleListener](); + + def addDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= { + queueLifecyleListeners.add(listener); + } + + def removeDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= { + queueLifecyleListeners.add(listener); + } +} + +class BrokerDatabase() extends Service { + + @BeanProperty + var store:Store=null; + + @BeanProperty + var virtualHost:VirtualHost=null; + + def start() ={ + //To change body of implemented methods use File | Settings | File Templates. + } + + def stop() = { + //To change body of implemented methods use File | Settings | File Templates. + } + +// TODO: re-implement. +// private static final boolean DEBUG = false; +// +// private final Flow databaseFlow = new Flow("database", false); +// +// private final SizeLimiter> storeLimiter; +// private final FlowController> storeController; +// private final int FLUSH_QUEUE_SIZE = 10000 * 1024; +// +// private DispatchQueue dispatcher; +// private Thread flushThread; +// private AtomicBoolean running = new AtomicBoolean(false); +// private DatabaseListener listener; +// +// private final LinkedNodeList> opQueue; +// private AtomicBoolean notify = new AtomicBoolean(false); +// private Semaphore opsReady = new Semaphore(0); +// private long opSequenceNumber; +// private long flushPointer = -1; // The last seq num for which flush was +// // requested +// private long requestedDelayedFlushPointer = -1; // Set to the last sequence +// // num scheduled for delay +// private long delayedFlushPointer = 0; // The last delayable sequence num +// // requested. +// private long flushDelay = 10; +// +// private final Runnable flushDelayCallback; +// private boolean storeBypass = true; +// +// public interface DatabaseListener { +// /** +// * Called if there is a catastrophic problem with the database. +// * +// * @param ioe +// * The causing exception. +// */ +// public void onDatabaseException(IOException ioe); +// } +// +// public static interface MessageRecordMarshaller { +// MessageRecord marshal(V element); +// +// /** +// * Called when a queue element is recovered from the store for a +// * particular queue. +// * +// * @param mRecord +// * The message record +// * @param queue +// * The queue that the element is being restored to (or null +// * if not being restored for a queue) +// * @return +// */ +// V unMarshall(MessageRecord mRecord, QueueDescriptor queue); +// } +// +// public BrokerDatabase(Store store) { +// this.store = store; +// this.opQueue = new LinkedNodeList>(); +// storeLimiter = new SizeLimiter>(FLUSH_QUEUE_SIZE, 0) { +// +// @Override +// public int getElementSize(OperationBase op) { +// return op.getLimiterSize(); +// } +// }; +// +// storeController = new FlowController>(new FlowControllable>() { +// +// public void flowElemAccepted(ISourceController> controller, OperationBase op) { +// addToOpQueue(op); +// } +// +// public IFlowResource getFlowResource() { +// return BrokerDatabase.this; +// } +// +// }, databaseFlow, storeLimiter, opQueue); +// storeController.useOverFlowQueue(false); +// super.onFlowOpened(storeController); +// +// flushDelayCallback = new Runnable() { +// public void run() { +// flushDelayCallback(); +// } +// }; +// } +// +// public synchronized void start() throws Exception { +// if (flushThread == null) { +// +// running.set(true); +// store.start(); +// flushThread = new Thread(new Runnable() { +// +// public void run() { +// processOps(); +// } +// +// }, "StoreThread"); +// flushThread.start(); +// } +// } +// +// public synchronized void stop() throws Exception { +// if (flushThread != null) { +// +// synchronized (opQueue) { +// updateFlushPointer(opSequenceNumber + 1); +// } +// +// running.set(false); +// boolean interrupted = false; +// while (true) { +// opsReady.release(); +// try { +// flushThread.join(); +// break; +// } catch (InterruptedException e) { +// interrupted = true; +// } +// } +// +// store.flush(); +// store.stop(); +// +// if (interrupted) { +// Thread.currentThread().interrupt(); +// } +// flushThread = null; +// } +// } +// +// /** +// * A blocking operation that lists all queues of a given type: +// * +// * @param type +// * The queue type +// * @return A list of queues. +// * +// * @throws Exception +// * If there was an error listing the queues. +// */ +// public Iterator listQueues(final short type) throws Exception { +// return store.execute(new Callback, Exception>() { +// +// public Iterator execute(Session session) throws Exception { +// return session.queueListByType(type, null, Integer.MAX_VALUE); +// } +// +// }, null); +// } +// +// /** +// * A blocking operation that lists all entries in the specified map +// * +// * @param map +// * The map to list +// * @return A list of map entries +// * +// * @throws Exception +// * If there was an error listing the queues. +// */ +// public Map listMapEntries(final AsciiBuffer map) throws Exception { +// return store.execute(new Callback, Exception>() { +// +// public Map execute(Session session) throws Exception { +// HashMap ret = new HashMap(); +// try { +// Iterator keys = session.mapEntryListKeys(map, null, -1); +// while (keys.hasNext()) { +// AsciiBuffer key = keys.next(); +// ret.put(key, session.mapEntryGet(map, key)); +// } +// } catch (Store.KeyNotFoundException knfe) { +// //No keys then: +// } +// +// return ret; +// } +// +// }, null); +// } +// +// /** +// * @param map +// * The name of the map to update. +// * @param key +// * The key in the map to update. +// * @param value +// * The value to insert. +// */ +// public OperationContext updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) { +// return add(new MapUpdateOperation(map, key, value), null, false); +// } +// +// /** +// * Executes user supplied {@link Operation}. If the {@link Operation} does +// * not throw any Exceptions, all updates to the store are committed, +// * otherwise they are rolled back. Any exceptions thrown by the +// * {@link Operation} are propagated by this method. +// * +// * If limiter space on the store processing queue is exceeded, the +// * controller will be blocked. +// * +// * If this method is called with flush set to +// * false there is no +// * guarantee made about when the operation will be executed. If flush +// * is true and {@link Operation#isDelayable()} is also +// * true then an attempt will be made to execute the event at +// * the {@link Store}'s configured delay interval. +// * +// * @param op +// * The operation to execute +// * @param flush +// * Whether or not this operation needs immediate processing. +// * @param controller +// * the source of the operation. +// * @return the {@link OperationContext} associated with the operation +// */ +// private OperationContext add(OperationBase op, ISourceController controller, boolean flush) { +// +// op.flushRequested = flush; +// storeController.add(op, controller); +// return op; +// } +// +// private final void addToOpQueue(OperationBase op) { +// if (!running.get()) { +// throw new IllegalStateException("BrokerDatabase not started"); +// } +// +// synchronized (opQueue) { +// op.opSequenceNumber = opSequenceNumber++; +// opQueue.addLast(op); +// if (op.flushRequested || storeLimiter.getThrottled()) { +// if (op.isDelayable() && flushDelay > 0) { +// scheduleDelayedFlush(op.opSequenceNumber); +// } else { +// updateFlushPointer(op.opSequenceNumber); +// } +// } +// } +// } +// +// private void updateFlushPointer(long seqNumber) { +// if (seqNumber > flushPointer) { +// flushPointer = seqNumber; +// OperationBase op = opQueue.getHead(); +// if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) { +// opsReady.release(); +// } +// } +// } +// +// private void scheduleDelayedFlush(long seqNumber) { +// if (seqNumber < flushPointer) { +// return; +// } +// +// if (seqNumber > delayedFlushPointer) { +// delayedFlushPointer = seqNumber; +// } +// +// if (requestedDelayedFlushPointer == -1) { +// requestedDelayedFlushPointer = delayedFlushPointer; +// Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback); +// } +// +// } +// +// private final void flushDelayCallback() { +// synchronized (opQueue) { +// if (flushPointer < requestedDelayedFlushPointer) { +// updateFlushPointer(requestedDelayedFlushPointer); +// +// } +// +// // If another delayed flush has been scheduled schedule it: +// requestedDelayedFlushPointer = -1; +// // Schedule next delay if needed: +// if (delayedFlushPointer > flushPointer) { +// scheduleDelayedFlush(delayedFlushPointer); +// } else { +// delayedFlushPointer = -1; +// } +// +// } +// } +// +// private final OperationBase getNextOp(boolean wait) { +// if (!wait) { +// synchronized (opQueue) { +// OperationBase op = opQueue.getHead(); +// if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) { +// op.unlink(); +// return op; +// } +// } +// return null; +// } else { +// OperationBase op = getNextOp(false); +// if (op == null) { +// notify.set(true); +// op = getNextOp(false); +// try { +// while (running.get() && op == null) { +// opsReady.acquireUninterruptibly(); +// op = getNextOp(false); +// } +// } finally { +// notify.set(false); +// opsReady.drainPermits(); +// } +// } +// return op; +// } +// } +// +// private final void processOps() { +// int count = 0; +// Session session = store.getSession(); +// while (running.get()) { +// final OperationBase firstOp = getNextOp(true); +// if (firstOp == null) { +// continue; +// } +// count = 0; +// +// // The first operation we get, triggers a store transaction. +// if (firstOp != null) { +// final LinkedList> processedQueue = new LinkedList>(); +// boolean locked = false; +// try { +// +// Operation op = firstOp; +// while (op != null) { +// final Operation toExec = op; +// if (toExec.beginExecute()) { +// if (!locked) { +// session.acquireLock(); +// locked = true; +// } +// count++; +// op.execute(session); +// processedQueue.add(op); +// /* +// * store.execute(new Store.VoidCallback() +// * { +// * +// * @Override public void run(Session session) throws +// * Exception { +// * +// * // Try to execute the operation against the // +// * session... try { toExec.execute(session); +// * processedQueue.add(toExec); } catch +// * (CancellationException ignore) { // +// * System.out.println("Cancelled" + // toExec); } } +// * }, null); +// */ +// } +// +// if (count < 1000) { +// op = getNextOp(false); +// } else { +// op = null; +// } +// } +// // executeOps(firstOp, processedQueue, counter); +// +// // If we procecessed some ops, flush and post process: +// if (!processedQueue.isEmpty()) { +// +// if (locked) { +// session.commit(); +// session.releaseLock(); +// locked = false; +// } +// if (DEBUG) +// System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue); +// // Sync the store: +// store.flush(); +// +// // Post process operations +// long release = 0; +// for (Operation processed : processedQueue) { +// processed.onCommit(); +// // System.out.println("Processed" + processed); +// release += processed.getLimiterSize(); +// } +// +// synchronized (opQueue) { +// this.storeLimiter.remove(1, release); +// } +// } +// +// } catch (IOException e) { +// for (Operation processed : processedQueue) { +// processed.onRollback(e); +// } +// onDatabaseException(e); +// } catch (RuntimeException e) { +// for (Operation processed : processedQueue) { +// processed.onRollback(e); +// } +// IOException ioe = new IOException(e.getMessage()); +// ioe.initCause(e); +// onDatabaseException(ioe); +// } catch (Exception e) { +// for (Operation processed : processedQueue) { +// processed.onRollback(e); +// } +// IOException ioe = new IOException(e.getMessage()); +// ioe.initCause(e); +// onDatabaseException(ioe); +// } finally { +// if (locked) { +// try { +// session.releaseLock(); +// } catch (Exception e) { +// IOException ioe = new IOException(e.getMessage()); +// ioe.initCause(e); +// onDatabaseException(ioe); +// } +// } +// } +// } +// } +// } +// +// /* +// * private final void executeOps(final OperationBase op, final +// * LinkedList processedQueue, final OpCounter counter) throws +// * FatalStoreException, Exception { store.execute(new +// * Store.VoidCallback() { +// * +// * @Override public void run(Session session) throws Exception { +// * +// * // Try to execute the operation against the // session... try { if +// * (op.execute(session)) { processedQueue.add(op); } else { counter.count--; +// * } } catch (CancellationException ignore) { System.out.println("Cancelled" +// * + op); } +// * +// * // See if we can batch up some additional operations // in this +// * transaction. if (counter.count < 100) { OperationBase next = +// * getNextOp(false); if (next != null) { counter.count++; executeOps(next, +// * processedQueue, counter); } } } }, null); } +// */ +// +// /** +// * Adds a queue to the database +// * +// * @param queue +// * The queue to add. +// */ +// public void addQueue(QueueDescriptor queue) { +// add(new QueueAddOperation(queue), null, false); +// } +// +// /** +// * Deletes a queue and all of its messages from the database +// * +// * @param queue +// * The queue to delete. +// */ +// public void deleteQueue(QueueDescriptor queue) { +// add(new QueueDeleteOperation(queue), null, false); +// } +// +// /** +// * Saves a message for all of the recipients in the +// * {@link BrokerMessageDelivery}. +// * +// * @param delivery +// * The delivery. +// * @param source +// * The source's controller. +// * @throws IOException +// * If there is an error marshalling the message. +// * @return The {@link OperationContext} associated with the operation +// */ +// public OperationContext persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController source) { +// return add(new AddMessageOperation(delivery), source, true); +// } +// +// /** +// * Saves a Message for a single queue. +// * +// * @param queueElement +// * The element to save. +// * @param source +// * The source initiating the save or null, if there isn't one. +// * @throws IOException +// * If there is an error marshalling the message. +// * +// * @return The {@link OperationContext} associated with the operation +// */ +// public OperationContext saveMessage(SaveableQueueElement queueElement, ISourceController source, boolean delayable) { +// return add(new AddMessageOperation(queueElement), source, !delayable); +// } +// +// /** +// * Deletes the given message from the store for the given queue. +// * +// * @param queueElement +// * @return The {@link OperationContext} associated with the operation +// */ +// public OperationContext deleteQueueElement(SaveableQueueElement queueElement) { +// return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false); +// } +// +// /** +// * Loads a batch of messages for the specified queue. The loaded messages +// * are given the provided {@link RestoreListener}. +// *

+// * NOTE: This method uses the queue sequence number for the +// * message not the store tracking number. +// * +// * @param queue +// * The queue for which to load messages +// * @param recordsOnly +// * True if message body shouldn't be restored +// * @param first +// * The first queue sequence number to load (-1 starts at +// * begining) +// * @param maxSequence +// * The maximum sequence number to load (-1 if no limit) +// * @param maxCount +// * The maximum number of messages to load (-1 if no limit) +// * @param listener +// * The listener to which messags should be passed. +// * @return The {@link OperationContext} associated with the operation +// */ +// public OperationContext restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener listener, +// MessageRecordMarshaller marshaller) { +// return add(new RestoreElementsOperation(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true); +// } +// +// private void onDatabaseException(IOException ioe) { +// if (listener != null) { +// listener.onDatabaseException(ioe); +// } else { +// ioe.printStackTrace(); +// } +// } +// +// public interface OperationContext extends ListenableFuture { +// +// /** +// * Attempts to cancel the store operation. Returns true if the operation +// * could be canceled or false if the operation was already executed by +// * the store. +// * +// * @return true if the operation could be canceled +// */ +// public boolean cancel(); +// +// /** +// * Requests flush for this database operation (overriding a previous +// * delay) +// */ +// public void requestFlush(); +// } +// +// /** +// * This interface is used to execute transacted code. +// * +// * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as +// * anonymous class. +// */ +// public interface Operation extends OperationContext { +// +// /** +// * Called when the saver is about to execute the operation. If true is +// * returned the operation can no longer be canceled. +// * +// * @return false if the operation has been canceled. +// */ +// public boolean beginExecute(); +// +// /** +// * Gets called by the {@link Store} +// * within a transactional context. If any exception is thrown including +// * Runtime exception, the transaction is rolled back. +// * +// * @param session +// * provides you access to read and update the persistent +// * data. +// * @throws Exception +// * if an system error occured while executing the +// * operations. +// * @throws RuntimeException +// * if an system error occured while executing the +// * operations. +// */ +// public void execute(Session session) throws CancellationException, Exception, RuntimeException; +// +// /** +// * Returns true if this operation can be delayed. This is useful in +// * cases where external events can negate the need to execute the +// * operation. The delay interval is not guaranteed to be honored, if +// * subsequent events or other store flush policy/criteria requires a +// * flush of subsequent events. +// * +// * @return True if the operation can be delayed. +// */ +// public boolean isDelayable(); +// +// /** +// * Returns the size to be used when calculating how much space this +// * operation takes on the store processing queue. +// * +// * @return The limiter size to be used. +// */ +// public int getLimiterSize(); +// +// /** +// * Called after {@link #execute(Session)} is called and the the +// * operation has been committed. +// */ +// public void onCommit(); +// +// /** +// * Called after {@link #execute(Session)} is called and the the +// * operation has been rolled back. +// */ +// public void onRollback(Throwable error); +// } +// +// /** +// * This is a convenience base class that can be used to implement +// * Operations. It handles operation cancellation for you. +// */ +// abstract class OperationBase extends LinkedNode> implements Operation { +// public boolean flushRequested = false; +// public long opSequenceNumber = -1; +// +// final protected AtomicBoolean executePending = new AtomicBoolean(true); +// final protected AtomicBoolean cancelled = new AtomicBoolean(false); +// final protected AtomicBoolean executed = new AtomicBoolean(false); +// final protected AtomicReference> listener = new AtomicReference>(); +// +// protected Throwable error; +// +// public static final int BASE_MEM_SIZE = 20; +// +// public boolean cancel(boolean interrupt) { +// return cancel(); +// } +// +// public boolean cancel() { +// if (storeBypass) { +// if (executePending.compareAndSet(true, false)) { +// cancelled.set(true); +// // System.out.println("Cancelled: " + this); +// synchronized (opQueue) { +// unlink(); +// storeController.elementDispatched(this); +// } +// fireListener(); +// return true; +// } +// } +// return cancelled.get(); +// } +// +// public final boolean isCancelled() { +// return cancelled.get(); +// } +// +// public final boolean isExecuted() { +// return executed.get(); +// } +// +// public final boolean isDone() { +// return isCancelled() || isExecuted(); +// } +// +// /** +// * Called when the saver is about to execute the operation. If true is +// * returned the operation can no longer be cancelled. +// * +// * @return true if operation should be executed +// */ +// public final boolean beginExecute() { +// if (executePending.compareAndSet(true, false)) { +// return true; +// } else { +// return false; +// } +// } +// +// /** +// * Gets called by the +// * {@link Store} method +// * within a transactional context. If any exception is thrown including +// * Runtime exception, the transaction is rolled back. +// * +// * @param session +// * provides you access to read and update the persistent +// * data. +// * @throws Exception +// * if an system error occured while executing the +// * operations. +// * @throws RuntimeException +// * if an system error occured while executing the +// * operations. +// */ +// public void execute(Session session) throws Exception, RuntimeException { +// if (DEBUG) +// System.out.println("Executing " + this); +// doExcecute(session); +// } +// +// abstract protected void doExcecute(Session session); +// +// public int getLimiterSize() { +// return BASE_MEM_SIZE; +// } +// +// public boolean isDelayable() { +// return false; +// } +// +// /** +// * Requests flush for this database operation (overriding a previous +// * delay) +// */ +// public void requestFlush() { +// synchronized (opQueue) { +// updateFlushPointer(opSequenceNumber); +// } +// } +// +// public void onCommit() { +// executed.set(true); +// fireListener(); +// } +// +// /** +// * Called after {@link #execute(Session)} is called and the the +// * operation has been rolled back. +// */ +// public void onRollback(Throwable error) { +// executed.set(true); +// if (!fireListener()) { +// error.printStackTrace(); +// } +// } +// +// private final boolean fireListener() { +// FutureListener l = this.listener.getAndSet(null); +// if (l != null) { +// l.onFutureComplete(this); +// return true; +// } +// return false; +// } +// +// public void setFutureListener(FutureListener listener) { +// this.listener.set(listener); +// if (isDone()) { +// fireListener(); +// } +// } +// +// /** +// * Subclasses the return a result should override this +// * @return The result. +// */ +// protected final V getResult() { +// return null; +// } +// +// /** +// * Waits if necessary for the computation to complete, and then +// * retrieves its result. +// * +// * @return the computed result +// * @throws CancellationException +// * if the computation was cancelled +// * @throws ExecutionException +// * if the computation threw an exception +// * @throws InterruptedException +// * if the current thread was interrupted while waiting +// */ +// public final V get() throws ExecutionException, InterruptedException { +// +// try { +// return get(-1, TimeUnit.MILLISECONDS); +// } catch (TimeoutException e) { +// //Can't happen. +// throw new AssertionError(e); +// } +// } +// +// /** +// * Waits if necessary for at most the given time for the computation +// * to complete, and then retrieves its result, if available. +// * +// * @param timeout the maximum time to wait +// * @param tu the time unit of the timeout argument +// * @return the computed result +// * @throws CancellationException if the computation was cancelled +// * @throws ExecutionException if the computation threw an +// * exception +// * @throws InterruptedException if the current thread was interrupted +// * while waiting +// * @throws TimeoutException if the wait timed out +// */ +// public final V get(long timeout, TimeUnit tu) throws ExecutionException, InterruptedException, TimeoutException { +// if (isCancelled()) { +// throw new CancellationException(); +// } +// if (error != null) { +// throw new ExecutionException(error); +// } +// +// //TODO implement blocking? +// if(!isDone()) +// { +// throw new UnsupportedOperationException("Blocking result retrieval not yet implemented"); +// } +// +// return getResult(); +// } +// +// public String toString() { +// return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/" + isCancelled() + "/" + isExecuted(); +// } +// } +// +// private class QueueAddOperation extends OperationBase { +// +// private QueueDescriptor qd; +// +// QueueAddOperation(QueueDescriptor queue) { +// qd = queue; +// } +// +// @Override +// protected void doExcecute(Session session) { +// try { +// session.queueAdd(qd); +// } catch (KeyNotFoundException e) { +// throw new FatalStoreException(e); +// } +// } +// +// public String toString() { +// return "QueueAdd: " + qd.getQueueName().toString(); +// } +// } +// +// private class QueueDeleteOperation extends OperationBase { +// +// private QueueDescriptor qd; +// +// QueueDeleteOperation(QueueDescriptor queue) { +// qd = queue; +// } +// +// @Override +// protected void doExcecute(Session session) { +// session.queueRemove(qd); +// } +// +// public String toString() { +// return "QueueDelete: " + qd.getQueueName().toString(); +// } +// } +// +// private class DeleteOperation extends OperationBase { +// private final long queueKey; +// private QueueDescriptor queue; +// +// public DeleteOperation(long queueKey, QueueDescriptor queue) { +// this.queueKey = queueKey; +// this.queue = queue; +// } +// +// @Override +// public int getLimiterSize() { +// return BASE_MEM_SIZE + 8; +// } +// +// @Override +// protected void doExcecute(Session session) { +// try { +// session.queueRemoveMessage(queue, queueKey); +// } catch (KeyNotFoundException e) { +// // TODO Probably doesn't always mean an error, it is possible +// // that +// // the queue has been deleted, in which case its messages will +// // have been deleted, too. +// e.printStackTrace(); +// } +// } +// +// public String toString() { +// return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString(); +// } +// } +// +// private class RestoreElementsOperation extends OperationBase { +// private QueueDescriptor queue; +// private long firstKey; +// private int maxRecords; +// private long maxSequence; +// private boolean recordsOnly; +// private RestoreListener listener; +// private Collection> msgs = null; +// private MessageRecordMarshaller marshaller; +// +// RestoreElementsOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener listener, MessageRecordMarshaller marshaller) { +// this.queue = queue; +// this.recordsOnly = recordsOnly; +// this.firstKey = firstKey; +// this.maxRecords = maxRecords; +// this.maxSequence = maxSequence; +// this.listener = listener; +// this.marshaller = marshaller; +// } +// +// @Override +// public int getLimiterSize() { +// return BASE_MEM_SIZE + 44; +// } +// +// @Override +// protected void doExcecute(Session session) { +// +// Iterator records = null; +// try { +// records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords); +// msgs = new LinkedList>(); +// } catch (KeyNotFoundException e) { +// msgs = new ArrayList>(0); +// return; +// } +// +// QueueRecord qRecord = null; +// int count = 0; +// if (records.hasNext()) { +// qRecord = records.next(); +// } +// +// while (qRecord != null) { +// RestoredElementImpl rm = new RestoredElementImpl(); +// // TODO should update jms redelivery here. +// rm.qRecord = qRecord; +// rm.queue = queue; +// count++; +// +// // Set the next sequence number: +// if (records.hasNext()) { +// qRecord = records.next(); +// rm.nextSequence = qRecord.getQueueKey(); +// } else { +// // Look up the next sequence number: +// try { +// records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1); +// if (!records.hasNext()) { +// rm.nextSequence = -1; +// } else { +// rm.nextSequence = records.next().getQueueKey(); +// } +// } catch (KeyNotFoundException e) { +// rm.nextSequence = -1; +// } +// qRecord = null; +// } +// +// if (!recordsOnly) { +// try { +// rm.mRecord = session.messageGetRecord(rm.qRecord.getMessageKey()); +// rm.marshaller = marshaller; +// msgs.add(rm); +// } catch (KeyNotFoundException shouldNotHappen) { +// shouldNotHappen.printStackTrace(); +// } +// } else { +// msgs.add(rm); +// } +// } +// +// if (DEBUG) +// System.out.println("Restored: " + count + " messages"); +// } +// +// @Override +// public void onCommit() { +// listener.elementsRestored(msgs); +// super.onCommit(); +// } +// +// public String toString() { +// return "MessageRestore: " + queue.getQueueName().toString() + " first: " + firstKey + " max: " + maxRecords; +// } +// } +// +// private class AddMessageOperation extends OperationBase { +// +// private final BrokerMessageDelivery brokerDelivery; +// private final SaveableQueueElement singleElement; +// private final MessageDelivery delivery; +// private MessageRecord record; +// private LinkedList> notifyTargets; +// private final boolean delayable; +// +// public AddMessageOperation(BrokerMessageDelivery delivery) { +// this.brokerDelivery = delivery; +// this.singleElement = null; +// this.delivery = delivery; +// this.delayable = delivery.isFlushDelayable(); +// if (!delayable) { +// this.record = delivery.createMessageRecord(); +// } +// } +// +// public AddMessageOperation(SaveableQueueElement queueElement) { +// this.brokerDelivery = null; +// singleElement = queueElement; +// delivery = queueElement.getElement(); +// this.record = singleElement.getElement().createMessageRecord(); +// delayable = false; +// } +// +// public boolean isDelayable() { +// return delayable; +// } +// +// @Override +// public int getLimiterSize() { +// return delivery.getFlowLimiterSize() + BASE_MEM_SIZE + 40; +// } +// +// @Override +// protected void doExcecute(Session session) { +// +// if (singleElement == null) { +// brokerDelivery.beginStore(); +// Collection> targets = brokerDelivery.getPersistentQueues(); +// +// if (targets != null && !targets.isEmpty()) { +// if (record == null) { +// record = brokerDelivery.createMessageRecord(); +// if (record == null) { +// throw new RuntimeException("Error creating message record for " + brokerDelivery.getMsgId()); +// } +// } +// record.setKey(brokerDelivery.getStoreTracking()); +// session.messageAdd(record); +// +// for (SaveableQueueElement target : targets) { +// try { +// QueueRecord queueRecord = new QueueRecord(); +// queueRecord.setAttachment(null); +// queueRecord.setMessageKey(record.getKey()); +// queueRecord.setSize(brokerDelivery.getFlowLimiterSize()); +// queueRecord.setQueueKey(target.getSequenceNumber()); +// session.queueAddMessage(target.getQueueDescriptor(), queueRecord); +// +// } catch (KeyNotFoundException e) { +// e.printStackTrace(); +// } +// +// if (target.requestSaveNotify()) { +// if (notifyTargets == null) { +// notifyTargets = new LinkedList>(); +// } +// notifyTargets.add(target); +// } +// } +// } else { +// // Save with no targets must have been cancelled: +// // System.out.println("Skipping save for " + +// // delivery.getStoreTracking()); +// } +// } else { +// +// session.messageAdd(record); +// try { +// QueueRecord queueRecord = new QueueRecord(); +// queueRecord.setAttachment(null); +// queueRecord.setMessageKey(record.getKey()); +// queueRecord.setSize(brokerDelivery.getFlowLimiterSize()); +// queueRecord.setQueueKey(singleElement.getSequenceNumber()); +// session.queueAddMessage(singleElement.getQueueDescriptor(), queueRecord); +// } catch (KeyNotFoundException e) { +// e.printStackTrace(); +// } +// } +// } +// +// @Override +// public void onCommit() { +// +// // Notify that the message was persisted. +// delivery.onMessagePersisted(); +// +// // Notify any of the targets that requested notify on save: +// if (singleElement != null && singleElement.requestSaveNotify()) { +// singleElement.notifySave(); +// } else if (notifyTargets != null) { +// for (SaveableQueueElement notify : notifyTargets) { +// notify.notifySave(); +// } +// } +// +// super.onCommit(); +// } +// +// public String toString() { +// return "AddOperation " + delivery.getStoreTracking() + super.toString(); +// } +// } +// +// private class MapUpdateOperation extends OperationBase { +// final AsciiBuffer map; +// final AsciiBuffer key; +// final Buffer value; +// +// MapUpdateOperation(AsciiBuffer mapName, AsciiBuffer key, Buffer value) { +// this.map = mapName; +// this.key = key; +// this.value = value; +// } +// +// @Override +// public int getLimiterSize() { +// return BASE_MEM_SIZE + map.length + key.length + value.length; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.BrokerDatabase.OperationBase#doExcecute +// * (org.apache.activemq.broker.store.Store.Session) +// */ +// @Override +// protected void doExcecute(Session session) { +// try { +// session.mapEntryPut(map, key, value); +// } catch (KeyNotFoundException e) { +// throw new Store.FatalStoreException(e); +// } +// } +// } +// +// private class RestoredElementImpl implements RestoredElement { +// QueueRecord qRecord; +// QueueDescriptor queue; +// MessageRecord mRecord; +// MessageRecordMarshaller marshaller; +// long nextSequence; +// +// public T getElement() throws IOException { +// if (mRecord == null) { +// return null; +// } +// return marshaller.unMarshall(mRecord, queue); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore.RestoredElement#getSequenceNumber +// * () +// */ +// public long getSequenceNumber() { +// return qRecord.getQueueKey(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore.RestoredElement#getStoreTracking +// * () +// */ +// public long getStoreTracking() { +// return qRecord.getMessageKey(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @seeorg.apache.activemq.queue.QueueStore.RestoredElement# +// * getNextSequenceNumber() +// */ +// public long getNextSequenceNumber() { +// return nextSequence; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize() +// */ +// public int getElementSize() { +// return qRecord.getSize(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore.RestoredElement#getExpiration() +// */ +// public long getExpiration() { +// return qRecord.getTte(); +// } +// } +// +// public long allocateStoreTracking() { +// return store.allocateStoreTracking(); +// } +// +// public void setDispatchQueue(DispatchQueue queue) { +// this.dispatcher = queue; +// } +// +// +// /** +// * @param sqe +// * @param source +// * @param delayable +// */ +// public OperationContext saveQeueuElement(SaveableQueueElement sqe, ISourceController source, boolean delayable, MessageRecordMarshaller marshaller) { +// return add(new AddElementOperation(sqe, delayable, marshaller), source, !delayable); +// } +// +// private class AddElementOperation extends OperationBase { +// +// private final SaveableQueueElement op; +// private MessageRecord record; +// private boolean delayable; +// private final MessageRecordMarshaller marshaller; +// +// public AddElementOperation(SaveableQueueElement op, boolean delayable, MessageRecordMarshaller marshaller) { +// this.op = op; +// this.delayable = delayable; +// if (!delayable) { +// record = marshaller.marshal(op.getElement()); +// this.marshaller = null; +// } else { +// this.marshaller = marshaller; +// } +// } +// +// public boolean isDelayable() { +// return delayable; +// } +// +// @Override +// public int getLimiterSize() { +// return op.getLimiterSize() + BASE_MEM_SIZE + 32; +// } +// +// @Override +// protected void doExcecute(Session session) { +// +// if (record == null) { +// record = marshaller.marshal(op.getElement()); +// } +// +// session.messageAdd(record); +// try { +// QueueRecord queueRecord = new QueueRecord(); +// queueRecord.setAttachment(null); +// queueRecord.setMessageKey(record.getKey()); +// queueRecord.setSize(record.getSize()); +// queueRecord.setQueueKey(op.getSequenceNumber()); +// session.queueAddMessage(op.getQueueDescriptor(), queueRecord); +// } catch (KeyNotFoundException e) { +// e.printStackTrace(); +// } +// } +// +// public String toString() { +// return "AddTxOpOperation " + record.getKey() + super.toString(); +// } +// } +// +// public long getFlushDelay() { +// return flushDelay; +// } +// +// public void setFlushDelay(long flushDelay) { +// this.flushDelay = flushDelay; +// } +// +// /** +// * @return true if operations are allowed to bypass the store. +// */ +// public boolean isStoreBypass() { +// return storeBypass; +// } +// +// /** +// * Sets if persistent operations should be allowed to bypass the store. +// * Defaults to true, as this will give you the best performance. In some +// * cases, you want to disable this as the store being used will double as an +// * audit log and you do not want any persistent operations to bypass the +// * store. +// * +// * When store bypass is disabled, all {@link Operation#cancel()} requests +// * will return false. +// * +// * @param enable +// * if true will enable store bypass +// */ +// public void setStoreBypass(boolean enable) { +// this.storeBypass = enable; +// } + +} + + + +class UserAlreadyConnectedException extends Exception + + +class BrokerQueueStore { // implements QueueStore { +// TODO: +// private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class); +// private static final boolean USE_OLD_QUEUE = false; +// private static final boolean USE_PRIORITY_QUEUES = true; +// +// private BrokerDatabase database; +// private DispatchQueue dispatchQueue; +// +// private static HashMap protocolHandlers = new HashMap(); +// private static final BrokerDatabase.MessageRecordMarshaller MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller() { +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller +// * #marshal(java.lang.Object) +// */ +// public MessageRecord marshal(MessageDelivery element) { +// return element.createMessageRecord(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller +// * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord) +// */ +// public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) { +// ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString()); +// if (handler == null) { +// try { +// handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString()); +// protocolHandlers.put(record.getEncoding().toString(), handler); +// } catch (Throwable thrown) { +// throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown); +// } +// } +// try { +// return handler.createMessageDelivery(record); +// } catch (IOException ioe) { +// throw new RuntimeException(ioe); +// } +// } +// }; +// +// final BrokerDatabase.MessageRecordMarshaller getMessageMarshaller() { +// return MESSAGE_MARSHALLER; +// } +// +// private static final Mapper EXPIRATION_MAPPER = new Mapper() { +// public Long map(MessageDelivery element) { +// return element.getExpiration(); +// } +// }; +// +// private static final Mapper SIZE_MAPPER = new Mapper() { +// public Integer map(MessageDelivery element) { +// return element.getFlowLimiterSize(); +// } +// }; +// +// public static final Mapper PRIORITY_MAPPER = new Mapper() { +// public Integer map(MessageDelivery element) { +// return element.getPriority(); +// } +// }; +// +// static public final Mapper KEY_MAPPER = new Mapper() { +// public Long map(MessageDelivery element) { +// return element.getStoreTracking(); +// } +// }; +// +// static public final Mapper PARTITION_MAPPER = new Mapper() { +// public Integer map(MessageDelivery element) { +// // we modulo 10 to have at most 10 partitions which the producers +// // gets split across. +// return (int) (element.getProducerId().hashCode() % 10); +// } +// }; +// +// public static final short SUBPARTITION_TYPE = 0; +// public static final short SHARED_QUEUE_TYPE = 1; +// public static final short DURABLE_QUEUE_TYPE = 2; +// public static short TRANSACTION_QUEUE_TYPE = 3; +// +// private final HashMap> sharedQueues = new HashMap>(); +// private final HashMap> durableQueues = new HashMap>(); +// +// private Mapper partitionMapper; +// +// private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1; +// private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1; +// // Be default we don't page out elements to disk. +// private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD; +// //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10; +// +// private static long dynamicQueueCounter = 0; +// +// private static final PersistencePolicy SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy() { +// +// private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD; +// +// public boolean isPersistent(MessageDelivery elem) { +// return elem.isPersistent(); +// } +// +// public boolean isPageOutPlaceHolders() { +// return true; +// } +// +// public boolean isPagingEnabled() { +// return PAGING_ENABLED; +// } +// +// public int getPagingInMemorySize() { +// return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD; +// } +// +// public boolean isThrottleSourcesToMemoryLimit() { +// // Keep the queue in memory. +// return true; +// } +// +// public int getDisconnectedThrottleRate() { +// // By default don't throttle consumers when disconnected. +// return 0; +// } +// +// public int getRecoveryBias() { +// return 8; +// } +// }; +// +// private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1; +// private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1; +// // Be default we don't page out elements to disk. +// //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD; +// private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10; +// +// private static final PersistencePolicy DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy() { +// +// private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD; +// +// public boolean isPersistent(MessageDelivery elem) { +// return elem.isPersistent(); +// } +// +// public boolean isPageOutPlaceHolders() { +// return true; +// } +// +// public boolean isPagingEnabled() { +// return PAGING_ENABLED; +// } +// +// public int getPagingInMemorySize() { +// return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD; +// } +// +// public boolean isThrottleSourcesToMemoryLimit() { +// // Keep the queue in memory. +// return true; +// } +// +// public int getDisconnectedThrottleRate() { +// // By default don't throttle consumers when disconnected. +// return 0; +// } +// +// public int getRecoveryBias() { +// return 8; +// } +// }; +// +// public void setDatabase(BrokerDatabase database) { +// this.database = database; +// } +// +// public void setDispatchQueue(DispatchQueue dispatchQueue) { +// this.dispatchQueue = dispatchQueue; +// } +// +// public void loadQueues() throws Exception { +// +// // Load shared queues +// Iterator results = database.listQueues(SHARED_QUEUE_TYPE); +// while (results.hasNext()) { +// QueueQueryResult loaded = results.next(); +// IQueue queue = createRestoredQueue(null, loaded); +// sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue); +// LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize()); +// } +// +// // Load durable queues +// results = database.listQueues(DURABLE_QUEUE_TYPE); +// while (results.hasNext()) { +// QueueQueryResult loaded = results.next(); +// IQueue queue = createRestoredDurableQueue(loaded); +// durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue); +// LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize()); +// +// } +// } +// +// private IQueue createRestoredQueue(IPartitionedQueue parent, QueueQueryResult loaded) throws IOException { +// +// IQueue queue; +// if (parent != null) { +// queue = parent.createPartition(loaded.getDescriptor().getPartitionKey()); +// } else { +// queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType()); +// } +// +// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize()); +// +// // Creat the child queues +// Collection children = loaded.getPartitions(); +// if (children != null) { +// try { +// IPartitionedQueue pQueue = (IPartitionedQueue) queue; +// for (QueueQueryResult child : children) { +// createRestoredQueue(pQueue, child); +// } +// } catch (ClassCastException cce) { +// LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName()); +// throw cce; +// } +// } +// +// return queue; +// +// } +// +// private IQueue createRestoredDurableQueue(QueueQueryResult loaded) throws IOException { +// +// ExclusivePersistentQueue queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType()); +// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize()); +// +// //TODO implement this for priority queue: +// // Create the child queues +// /* +// * Collection children = loaded.getPartitions(); if +// * (children != null) { try { IPartitionedQueue +// * pQueue = (IPartitionedQueue) queue; for +// * (QueueQueryResult child : children) { createRestoredQueue(pQueue, +// * child); } } catch (ClassCastException cce) { +// * LOG.error("Loaded partition for unpartitionable queue: " + +// * queue.getResourceName()); throw cce; } } +// */ +// +// return queue; +// +// } +// +// public IQueue getQueue(AsciiBuffer queueName) { +// //TODO +// return null; +// } +// +// public Collection> getSharedQueues() { +// synchronized (this) { +// Collection> c = sharedQueues.values(); +// ArrayList> ret = new ArrayList>(c.size()); +// ret.addAll(c); +// return ret; +// } +// } +// +// public IQueue createDurableQueue(String name) { +// IQueue queue = null; +// synchronized (this) { +// queue = durableQueues.get(name); +// if (queue == null) { +// queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED); +// queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE); +// queue.initialize(0, 0, 0, 0); +// durableQueues.put(name, queue); +// addQueue(queue.getDescriptor()); +// } +// } +// +// return queue; +// } +// +// public ExclusivePersistentQueue createExclusivePersistentQueue() { +// ExclusivePersistentQueue queue = null; +// synchronized (this) { +// String name = "temp:" + (dynamicQueueCounter++); +// queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED); +// queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE); +// queue.initialize(0, 0, 0, 0); +// addQueue(queue.getDescriptor()); +// } +// return queue; +// } +// +// public Collection> getDurableQueues() { +// synchronized (this) { +// Collection> c = durableQueues.values(); +// ArrayList> ret = new ArrayList>(c.size()); +// ret.addAll(c); +// return ret; +// } +// } +// +// public IQueue createSharedQueue(String name) { +// +// IQueue queue = null; +// synchronized (this) { +// queue = sharedQueues.get(name); +// if (queue == null) { +// queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED); +// queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE); +// queue.initialize(0, 0, 0, 0); +// sharedQueues.put(name, queue); +// addQueue(queue.getDescriptor()); +// } +// } +// +// return queue; +// } +// +// private ExclusivePersistentQueue createDurableQueueInternal(final String name, short type) { +// ExclusivePersistentQueue queue; +// +// SizeLimiter limiter = new SizeLimiter(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) { +// @Override +// public int getElementSize(MessageDelivery elem) { +// return elem.getFlowLimiterSize(); +// } +// }; +// queue = new ExclusivePersistentQueue(name, limiter); +// queue.setStore(this); +// queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY); +// queue.setExpirationMapper(EXPIRATION_MAPPER); +// return queue; +// } +// +// private IQueue createSharedQueueInternal(final String name, short type) { +// +// IQueue ret; +// +// switch (type) { +// case QueueDescriptor.PARTITIONED: { +// PartitionedQueue queue = new PartitionedQueue(name) { +// @Override +// public IQueue createPartition(int partitionKey) { +// IQueue queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED); +// queue.getDescriptor().setPartitionId(partitionKey); +// queue.getDescriptor().setParent(this.getDescriptor().getQueueName()); +// return queue; +// } +// +// }; +// queue.setPartitionMapper(partitionMapper); +// +// ret = queue; +// break; +// } +// case QueueDescriptor.SHARED_PRIORITY: { +// PrioritySizeLimiter limiter = new PrioritySizeLimiter(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10); +// limiter.setPriorityMapper(PRIORITY_MAPPER); +// limiter.setSizeMapper(SIZE_MAPPER); +// SharedPriorityQueue queue = new SharedPriorityQueue(name, limiter); +// ret = queue; +// queue.setKeyMapper(KEY_MAPPER); +// queue.setAutoRelease(true); +// break; +// } +// case QueueDescriptor.SHARED: { +// SizeLimiter limiter = new SizeLimiter(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) { +// @Override +// public int getElementSize(MessageDelivery elem) { +// return elem.getFlowLimiterSize(); +// } +// }; +// +// if (!USE_OLD_QUEUE) { +// SharedQueue sQueue = new SharedQueue(name, limiter); +// sQueue.setKeyMapper(KEY_MAPPER); +// sQueue.setAutoRelease(true); +// ret = sQueue; +// } else { +// SharedQueueOld sQueue = new SharedQueueOld(name, limiter); +// sQueue.setKeyMapper(KEY_MAPPER); +// sQueue.setAutoRelease(true); +// ret = sQueue; +// } +// break; +// } +// default: { +// throw new IllegalArgumentException("Unknown queue type" + type); +// } +// } +// ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE); +// ret.setStore(this); +// ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY); +// ret.setExpirationMapper(EXPIRATION_MAPPER); +// +// return ret; +// } +// +// public final void deleteQueueElement(SaveableQueueElement sqe) { +// MessageDelivery md = sqe.getElement(); +// //If the message delivery isn't null, funnel through it +// //since the message may not yet be in the store: +// if (md != null) { +// md.acknowledge(sqe); +// } else { +// database.deleteQueueElement(sqe); +// } +// +// } +// +// public final boolean isFromStore(MessageDelivery elem) { +// return elem.isFromStore(); +// } +// +// public final void persistQueueElement(SaveableQueueElement elem, ISourceController controller, boolean delayable) { +// elem.getElement().persist(elem, controller, delayable); +// } +// +// public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener listener) { +// database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER); +// } +// +// public final void addQueue(QueueDescriptor queue) { +// database.addQueue(queue); +// } +// +// public final void deleteQueue(QueueDescriptor queue) { +// database.deleteQueue(queue); +// } + + def setDatabase(database:BrokerDatabase ) = { + } + + def setDispatchQueue(dispatchQueue:DispatchQueue )= { + } + + def loadQueues() ={ + } +} \ No newline at end of file Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java Wed Jul 7 03:40:18 2010 @@ -61,7 +61,7 @@ public class XidImpl implements Xid, Clo /** * Initialize an XID using another XID as the source of data. - * + * * @param from * the XID to initialize this XID from */ @@ -87,10 +87,10 @@ public class XidImpl implements Xid, Clo /** * Determine whether or not two objects of this type are equal. - * + * * @param o * the other XID object to be compared with this XID. - * + * * @return Returns true of the supplied object represents the same global * transaction as this, otherwise returns false. */ @@ -102,14 +102,14 @@ public class XidImpl implements Xid, Clo if (formatId == -1 && other.formatId == -1) return true; - return formatId == other.formatId + return formatId == other.formatId && globalTransactionId.equals(other.globalTransactionId) && branchQualifier.equals(other.branchQualifier); } /** * Compute the hash code. - * + * * @return the computed hashcode */ public int hashCode() { @@ -122,7 +122,7 @@ public class XidImpl implements Xid, Clo * Return a string representing this XID. *

* This is normally used to display the XID when debugging. - * + * * @return the string representation of this XID */ @@ -135,7 +135,7 @@ public class XidImpl implements Xid, Clo /** * Obtain the format identifier part of the XID. - * + * * @return Format identifier. -1 indicates a null XID */ public int getFormatId() { @@ -144,7 +144,7 @@ public class XidImpl implements Xid, Clo /** * Returns the global transaction identifier for this XID. - * + * * @return the global transaction identifier */ public byte[] getGlobalTransactionId() { @@ -154,7 +154,7 @@ public class XidImpl implements Xid, Clo /** * Returns the branch qualifier for this XID. - * + * * @return the branch qualifier */ public byte[] getBranchQualifier() { @@ -167,8 +167,8 @@ public class XidImpl implements Xid, Clo /** * Set the branch qualifier for this XID. Note that the branch qualifier has * a maximum size. - * - * @param qual + * + * @param branchID * a Byte array containing the branch qualifier to be set. If the * size of the array exceeds MAXBQUALSIZE, only the first * MAXBQUALSIZE elements of qual will be used. @@ -205,16 +205,8 @@ public class XidImpl implements Xid, Clo /** * Writes this XidImpl's data to the DataOutput destination - * - * @param out - * The DataOutput destination - * @param maxbytes - * Maximum number of bytes that may be written to the destination - * - * @exception ELogEventTooLong - * The data could not be written without exceeding the - * maxbytes parameter. The data may have been partially - * written. + * + * @param out The DataOutput destination */ public void writebody(DataOutput out) throws IOException { out.writeInt(formatId); // format ID @@ -227,7 +219,7 @@ public class XidImpl implements Xid, Clo /** * read xid from an Array and set each fields. - * + * * @param in * the data input array * @throws IOException @@ -247,7 +239,7 @@ public class XidImpl implements Xid, Clo } /** - * @param tid + * @param xid * @return */ public static Buffer toBuffer(Xid xid) { @@ -260,6 +252,6 @@ public class XidImpl implements Xid, Clo throw new RuntimeException(e); } return baos.toBuffer(); - + } } // class XidImpl Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty) Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java&r1=961067&r2=961068&rev=961068&view=diff ============================================================================== (empty)