Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 34635 invoked from network); 7 Jul 2010 03:40:11 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 03:40:11 -0000 Received: (qmail 29792 invoked by uid 500); 7 Jul 2010 03:40:10 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 29664 invoked by uid 500); 7 Jul 2010 03:40:10 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 29648 invoked by uid 99); 7 Jul 2010 03:40:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:40:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 03:39:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B1D5B2388A68; Wed, 7 Jul 2010 03:39:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961067 [4/5] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/... Date: Wed, 07 Jul 2010 03:39:04 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707033904.B1D5B2388A68@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java?rev=961067&r1=961066&r2=961067&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Queue.java Wed Jul 7 03:39:03 2010 @@ -17,120 +17,137 @@ package org.apache.activemq.apollo.broker; import org.apache.activemq.apollo.broker.ProtocolHandler.ConsumerContext; -import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.queue.IQueue; -import org.apache.activemq.queue.Subscription; +import org.fusesource.hawtdispatch.internal.util.RunnableCountDownLatch; public class Queue implements DeliveryTarget { - private Destination destination; - private final IQueue queue; - private VirtualHost virtualHost; - - Queue(IQueue queue) { - this.queue = queue; - } - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq - * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) - */ - public void deliver(MessageDelivery message, ISourceController source) { - queue.add(message, source); - } - - public final void addSubscription(final Subscription sub) { - queue.addSubscription(sub); - } +// TODO: +// private Destination destination; +// private VirtualHost virtualHost; +// +// Queue() { +// this.queue = queue; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq +// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) +// */ +// public void deliver(MessageDelivery message, ISourceController source) { +// queue.add(message, source); +// } +// +// public final void addSubscription(final Subscription sub) { +// queue.addSubscription(sub); +// } +// +// public boolean removeSubscription(final Subscription sub) { +// return queue.removeSubscription(sub); +// } +// +// public void start() throws Exception { +// queue.start(); +// } +// +// public void stop() throws Exception { +// if (queue != null) { +// queue.stop(); +// } +// } +// +// public void shutdown(Runnable onShutdown) throws Exception { +// if (queue != null) { +// queue.shutdown(onShutdown); +// } +// } +// +// public boolean hasSelector() { +// return false; +// } +// +// public boolean matches(MessageDelivery message) { +// return true; +// } +// +// public VirtualHost getBroker() { +// return virtualHost; +// } +// +// public void setVirtualHost(VirtualHost virtualHost) { +// this.virtualHost = virtualHost; +// } +// +// public void setDestination(Destination destination) { +// this.destination = destination; +// } +// +// public final Destination getDestination() { +// return destination; +// } +// +// public boolean isDurable() { +// return true; +// } +// +// public static class QueueSubscription implements BrokerSubscription { +// Subscription subscription; +// final Queue queue; +// +// public QueueSubscription(Queue queue) { +// this.queue = queue; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache. +// * activemq.broker.protocol.ProtocolHandler.ConsumerContext) +// */ +// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException { +// this.subscription = subscription; +// queue.addSubscription(subscription); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache +// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext) +// */ +// public void disconnect(ConsumerContext context) { +// queue.removeSubscription(subscription); +// } +// +// /* (non-Javadoc) +// * @see org.apache.activemq.broker.BrokerSubscription#getDestination() +// */ +// public Destination getDestination() { +// return queue.getDestination(); +// } +// } - public boolean removeSubscription(final Subscription sub) { - return queue.removeSubscription(sub); - } - - public void start() throws Exception { - queue.start(); - } - - public void stop() throws Exception { - if (queue != null) { - queue.stop(); - } - } - - public void shutdown(Runnable onShutdown) throws Exception { - if (queue != null) { - queue.shutdown(onShutdown); - } + public void deliver(MessageDelivery message) { + //To change body of implemented methods use File | Settings | File Templates. } public boolean hasSelector() { - return false; + return false; //To change body of implemented methods use File | Settings | File Templates. } public boolean matches(MessageDelivery message) { - return true; - } - - public VirtualHost getBroker() { - return virtualHost; - } - - public void setVirtualHost(VirtualHost virtualHost) { - this.virtualHost = virtualHost; + return false; //To change body of implemented methods use File | Settings | File Templates. } - public void setDestination(Destination destination) { - this.destination = destination; + public void shutdown(RunnableCountDownLatch done) { } - public final Destination getDestination() { + public Destination getDestination() { return destination; } - - public boolean isDurable() { - return true; - } - - public static class QueueSubscription implements BrokerSubscription { - Subscription subscription; - final Queue queue; - - public QueueSubscription(Queue queue) { - this.queue = queue; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.broker.BrokerSubscription#connect(org.apache. - * activemq.broker.protocol.ProtocolHandler.ConsumerContext) - */ - public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException { - this.subscription = subscription; - queue.addSubscription(subscription); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache - * .activemq.broker.protocol.ProtocolHandler.ConsumerContext) - */ - public void disconnect(ConsumerContext context) { - queue.removeSubscription(subscription); - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerSubscription#getDestination() - */ - public Destination getDestination() { - return queue.getDestination(); - } - } - } \ No newline at end of file Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=961067&r1=961066&r2=961067&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Wed Jul 7 03:39:03 2010 @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import org.apache.activemq.flow.ISourceController; import org.apache.activemq.util.buffer.AsciiBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,43 +86,43 @@ final public class Router { } } - public void route(final MessageDelivery msg, ISourceController controller, boolean autoCreate) { - - //If the message is part of transaction send it to the transaction manager - if(msg.getTransactionId() >= 0) - { - virtualHost.getTransactionManager().newMessage(msg, controller); - return; - } - - Collection targets = route(msg.getDestination(), msg, autoCreate); - - //Set up the delivery for persistence: - msg.beginDispatch(database); - - try { - // TODO: - // Consider doing some caching of this sub list. Most producers - // always send to the same destination. - if (targets != null) { - // The sinks will request persistence via MessageDelivery.persist() - // if they require persistence: - for (DeliveryTarget target : targets) { - target.deliver(msg, controller); - } - } - } finally { - try { - msg.finishDispatch(controller); - } catch (IOException ioe) { - //TODO: Error serializing the message, this should trigger an error - //This is a pretty severe error as we've already delivered - //the message to the recipients. If we send an error response - //back it could result in a duplicate. Does this mean that we - //should persist the message prior to sending to the recips? - ioe.printStackTrace(); - } - } + public void route(final MessageDelivery msg, boolean autoCreate) { +// TODO: +// //If the message is part of transaction send it to the transaction manager +// if(msg.getTransactionId() >= 0) +// { +// virtualHost.getTransactionManager().newMessage(msg, controller); +// return; +// } +// +// Collection targets = route(msg.getDestination(), msg, autoCreate); +// +// //Set up the delivery for persistence: +// msg.beginDispatch(database); +// +// try { +// // TODO: +// // Consider doing some caching of this sub list. Most producers +// // always send to the same destination. +// if (targets != null) { +// // The sinks will request persistence via MessageDelivery.persist() +// // if they require persistence: +// for (DeliveryTarget target : targets) { +// target.deliver(msg, controller); +// } +// } +// } finally { +// try { +// msg.finishDispatch(controller); +// } catch (IOException ioe) { +// //TODO: Error serializing the message, this should trigger an error +// //This is a pretty severe error as we've already delivered +// //the message to the recipients. If we send an error response +// //back it could result in a duplicate. Does this mean that we +// //should persist the message prior to sending to the recips? +// ioe.printStackTrace(); +// } +// } } private Collection route(Destination destination, MessageDelivery msg, boolean autoCreate) { Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java?rev=961067&r1=961066&r2=961067&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TopicSubscription.java Wed Jul 7 03:39:03 2010 @@ -20,140 +20,155 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.FilterException; import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.flow.Flow; -import org.apache.activemq.flow.IFlowLimiter; -import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.flow.SizeLimiter; -import org.apache.activemq.queue.ExclusivePersistentQueue; -import org.apache.activemq.queue.ExclusiveQueue; -import org.apache.activemq.queue.IFlowQueue; -import org.apache.activemq.queue.QueueDispatchTarget; -import org.apache.activemq.queue.Subscription; import org.apache.activemq.util.IntrospectionSupport; class TopicSubscription implements BrokerSubscription, DeliveryTarget { - static final boolean USE_PERSISTENT_QUEUES = true; - - protected final BooleanExpression selector; - protected final Destination destination; - protected Subscription connectedSub; - private final VirtualHost host; - - //TODO: replace this with a base interface for queue which also support non persistent use case. - private IFlowQueue queue; - - TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) { - this.host = host; - this.selector = selector; - this.destination = destination; - } - - @Override - public String toString() { - return IntrospectionSupport.toString(this); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq - * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) - */ - public final void deliver(MessageDelivery message, ISourceController source) { - if (matches(message)) { - queue.add(message, source); - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.broker.DeliveryTarget#hasSelector() - */ - public boolean hasSelector() { - return selector != null; +// static final boolean USE_PERSISTENT_QUEUES = true; +// +// protected final BooleanExpression selector; +// protected final Destination destination; +// protected Subscription connectedSub; +// private final VirtualHost host; +// +// //TODO: replace this with a base interface for queue which also support non persistent use case. +// private IFlowQueue queue; +// +// TopicSubscription(VirtualHost host, Destination destination, BooleanExpression selector) { +// this.host = host; +// this.selector = selector; +// this.destination = destination; +// } +// +// @Override +// public String toString() { +// return IntrospectionSupport.toString(this); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq +// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController) +// */ +// public final void deliver(MessageDelivery message, ISourceController source) { +// if (matches(message)) { +// queue.add(message, source); +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.broker.DeliveryTarget#hasSelector() +// */ +// public boolean hasSelector() { +// return selector != null; +// } +// +// public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException { +// if (this.connectedSub == null) { +// if( subscription.isPersistent() ) { +// queue = createPersistentQueue(subscription); +// } else { +// queue = createNonPersistentQueue(subscription); +// } +// queue.start(); +// +// this.connectedSub = subscription; +// this.queue.addSubscription(connectedSub); +// this.host.getRouter().bind(destination, this); +// } else if (connectedSub != subscription) { +// throw new UserAlreadyConnectedException(); +// } +// } +// +// private IFlowQueue createNonPersistentQueue(final ConsumerContext subscription) { +// Flow flow = new Flow(subscription.getResourceName(), false); +// String name = subscription.getResourceName(); +// IFlowLimiter limiter = new SizeLimiter(100, 50); +// ExclusiveQueue queue = new ExclusiveQueue(flow, name, limiter); +// queue.setDrain( new QueueDispatchTarget() { +// public void drain(MessageDelivery elem, ISourceController controller) { +// subscription.add(elem, controller); +// } +// }); +// return queue; +// } +// +// private IFlowQueue createPersistentQueue(ConsumerContext subscription) { +// ExclusivePersistentQueue queue = host.getQueueStore().createExclusivePersistentQueue(); +// return queue; +// } +// +// @SuppressWarnings("unchecked") +// private void destroyPersistentQueue(IFlowQueue queue) { +// ExclusivePersistentQueue pq = (ExclusivePersistentQueue) queue; +// host.getQueueStore().deleteQueue(pq.getDescriptor()); +// } +// +// public synchronized void disconnect(final ConsumerContext subscription) { +// if (connectedSub != null && connectedSub == subscription) { +// this.host.getRouter().unbind(destination, this); +// this.queue.removeSubscription(connectedSub); +// this.connectedSub = null; +// +// queue.stop(); +// if( USE_PERSISTENT_QUEUES ) { +// destroyPersistentQueue(queue); +// } +// queue=null; +// } +// } +// +// +// +// public boolean matches(MessageDelivery message) { +// if (selector == null) { +// return true; +// } +// +// MessageEvaluationContext selectorContext = message.createMessageEvaluationContext(); +// selectorContext.setDestination(destination); +// try { +// return (selector.matches(selectorContext)); +// } catch (FilterException e) { +// e.printStackTrace(); +// return false; +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.broker.BrokerSubscription#getDestination() +// */ +// public Destination getDestination() { +// return destination; +// } + + public void connect(ConsumerContext subscription) throws Exception { + //To change body of implemented methods use File | Settings | File Templates. + } + + public void disconnect(ConsumerContext subscription) { + //To change body of implemented methods use File | Settings | File Templates. } - public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException { - if (this.connectedSub == null) { - if( subscription.isPersistent() ) { - queue = createPersistentQueue(subscription); - } else { - queue = createNonPersistentQueue(subscription); - } - queue.start(); - - this.connectedSub = subscription; - this.queue.addSubscription(connectedSub); - this.host.getRouter().bind(destination, this); - } else if (connectedSub != subscription) { - throw new UserAlreadyConnectedException(); - } - } - - private IFlowQueue createNonPersistentQueue(final ConsumerContext subscription) { - Flow flow = new Flow(subscription.getResourceName(), false); - String name = subscription.getResourceName(); - IFlowLimiter limiter = new SizeLimiter(100, 50); - ExclusiveQueue queue = new ExclusiveQueue(flow, name, limiter); - queue.setDrain( new QueueDispatchTarget() { - public void drain(MessageDelivery elem, ISourceController controller) { - subscription.add(elem, controller); - } - }); - return queue; - } - - private IFlowQueue createPersistentQueue(ConsumerContext subscription) { - ExclusivePersistentQueue queue = host.getQueueStore().createExclusivePersistentQueue(); - return queue; - } - - @SuppressWarnings("unchecked") - private void destroyPersistentQueue(IFlowQueue queue) { - ExclusivePersistentQueue pq = (ExclusivePersistentQueue) queue; - host.getQueueStore().deleteQueue(pq.getDescriptor()); - } - - public synchronized void disconnect(final ConsumerContext subscription) { - if (connectedSub != null && connectedSub == subscription) { - this.host.getRouter().unbind(destination, this); - this.queue.removeSubscription(connectedSub); - this.connectedSub = null; - - queue.stop(); - if( USE_PERSISTENT_QUEUES ) { - destroyPersistentQueue(queue); - } - queue=null; - } - } - - - - public boolean matches(MessageDelivery message) { - if (selector == null) { - return true; - } - - MessageEvaluationContext selectorContext = message.createMessageEvaluationContext(); - selectorContext.setDestination(destination); - try { - return (selector.matches(selectorContext)); - } catch (FilterException e) { - e.printStackTrace(); - return false; - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.broker.BrokerSubscription#getDestination() - */ public Destination getDestination() { - return destination; + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public void deliver(MessageDelivery message) { + //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean hasSelector() { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean matches(MessageDelivery message) { + return false; //To change body of implemented methods use File | Settings | File Templates. } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=961067&r1=961066&r2=961067&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Wed Jul 7 03:39:03 2010 @@ -22,10 +22,6 @@ import java.util.HashSet; import javax.transaction.xa.XAException; import org.apache.activemq.broker.store.Store.MessageRecord; -import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.queue.IQueue; -import org.apache.activemq.queue.Subscription; -import org.apache.activemq.queue.Subscription.SubscriptionDelivery; import org.apache.activemq.util.FutureListener; import org.apache.activemq.util.ListenableFuture; import org.apache.activemq.util.buffer.AsciiBuffer; @@ -41,526 +37,527 @@ import org.apache.commons.logging.LogFac */ public abstract class Transaction { - private static final Log LOG = LogFactory.getLog(Transaction.class); - - public static final byte START_STATE = 0; // can go to: 1,2,3 - public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4 - public static final byte PREPARED_STATE = 2; // can go to: 3, 4 - public static final byte COMMITED_STATE = 3; - public static final byte ROLLBACK_STATE = 4; - - static final byte TYPE_LOCAL = 0; - static final byte TYPE_XA = 1; - - private byte state = START_STATE; - private final TransactionManager manager; - private final long tid; - private final IQueue opQueue; - protected HashSet listeners; - - private TxProcessor processor; - - Transaction(TransactionManager manager, long tid, IQueue opQueue) { - this.manager = manager; - this.opQueue = opQueue; - opQueue.start(); - this.tid = tid; - } - - /** - * @return the unique identifier used by the {@link TransactionManager} to - * identify this {@link Transaction} - * - */ - public long getTid() { - return tid; - } - - public AsciiBuffer getBackingQueueName() { - return opQueue.getDescriptor().getQueueName(); - } - - /** - * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL} - */ - public abstract byte getType(); - - public void addMessage(MessageDelivery m, ISourceController source) { - - synchronized (this) { - switch (state) { - case START_STATE: - case IN_USE_STATE: - opQueue.add(new TxMessage(m, this), source); - break; - default: { - throw new IllegalStateException("Can't add message to finished or prepared transaction"); - } - } - } - } - - public void addAck(SubscriptionDelivery toAck) { - synchronized (this) { - switch (state) { - case START_STATE: - case IN_USE_STATE: - IQueue target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName()); - //Queue could be null if it was just deleted: - if (target != null) { - long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking(); - opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null); - } - break; - default: { - throw new IllegalStateException("Can't add message to finished or prepared transaction"); - } - } - } - } - - public byte getState() { - return state; - } - - public void setState(byte state, FutureListener listener) { - this.state = state; - ListenableFuture future = manager.persistTransaction(this); - future.setFutureListener(listener); - } - - public void prePrepare() throws Exception { - - // Is it ok to call prepare now given the state of the - // transaction? - switch (state) { - case START_STATE: - case IN_USE_STATE: - break; - default: - XAException xae = new XAException("Prepare cannot be called now."); - xae.errorCode = XAException.XAER_PROTO; - throw xae; - } - } - - protected void fireAfterCommit() throws Exception { - - synchronized (this) { - for (TransactionListener listener : listeners) { - listener.onCommit(this); - } - } - } - - public void fireAfterRollback() throws Exception { - synchronized (this) { - for (TransactionListener listener : listeners) { - listener.onRollback(this); - } - } - } - - public void fireAfterPrepare() throws Exception { - synchronized (this) { - for (TransactionListener listener : listeners) { - listener.onPrepared(this); - } - } - } - - public String toString() { - return super.toString() + "[queue=" + opQueue + "]"; - } - - public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException; - - public abstract void rollback(TransactionListener listener) throws XAException, IOException; - - public abstract int prepare(TransactionListener listener) throws XAException, IOException; - - public boolean isPrepared() { - return getState() == PREPARED_STATE; - } - - public long size() { - return opQueue.getEnqueuedCount(); - } - - public static abstract class TransactionListener { - public void onRollback(Transaction t) { - - } - - public void onCommit(Transaction t) { - - } - - public void onPrepared(Transaction t) { - - } - } - - interface TxOp { - public static final short TYPE_MESSAGE = 0; - public static final short TYPE_ACK = 1; - - public short getType(); - - public T asType(Class type); - - public void onRollback(ISourceController controller); - - public void onCommit(ISourceController controller); - - public int getLimiterSize(); - - public boolean isFromStore(); - - public long getStoreTracking(); - - public MessageRecord createMessageRecord(); - - /** - * @return - */ - public boolean isPersistent(); - - /** - * @return - */ - public Long getExpiration(); - - public int getPriority(); - } - - static class TxMessage implements TxOp { - MessageDelivery message; - Transaction tx; - private boolean fromStore; - - /** - * @param m - * @param transaction - */ - public TxMessage(MessageDelivery m, Transaction tx) { - message = m; - this.tx = tx; - } - - public T asType(Class type) { - if (type == TxMessage.class) { - return type.cast(this); - } else { - return null; - } - } - - public final short getType() { - return TYPE_MESSAGE; - } - - public final int getLimiterSize() { - return message.getFlowLimiterSize(); - } - - public final void onCommit(ISourceController controller) { - message.clearTransactionId(); - tx.manager.getVirtualHost().getRouter().route(message, controller, true); - } - - public final void onRollback(ISourceController controller) { - //Nothing to do here, message just gets dropped: - return; - } - - public final boolean isFromStore() { - return fromStore; - } - - public final MessageRecord createMessageRecord() { - return message.createMessageRecord(); - } - - public final long getStoreTracking() { - return message.getStoreTracking(); - } - - public final boolean isPersistent() { - return message.isPersistent(); - } - - public final Long getExpiration() { - return message.getExpiration(); - } - - public final int getPriority() { - return message.getPriority(); - } - } - - static class TxAck implements TxOp { - public static AsciiBuffer ENCODING = new AsciiBuffer("txack"); - Transaction tx; - IQueue queue; //Desriptor of the queue on which to delete. - long queueSequence; //Sequence number of the element on the queue from which to delete. - final long storeTracking; //Store tracking of this delete op. - private boolean fromStore; - private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1; - - TxAck(IQueue queue, long removalKey, long storeTracking, Transaction tx) { - this.queue = queue; - this.queueSequence = removalKey; - this.tx = tx; - this.storeTracking = storeTracking; - } - - public final short getType() { - return TYPE_ACK; - } - - public T asType(Class type) { - if (type == TxAck.class) { - return type.cast(this); - } else { - return null; - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit() - */ - public final void onCommit(ISourceController controller) { - queue.remove(queueSequence); - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback() - */ - public final void onRollback(ISourceController controller) { - //No-Op for now, it is possible that we'd want to unaquire these - //in the queue if the client weren't to keep these - //around - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize() - */ - public final int getLimiterSize() { - return MEM_SIZE; - } - - public final boolean isFromStore() { - return fromStore; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking() - */ - public final long getStoreTracking() { - return storeTracking; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord - * () - */ - public final MessageRecord createMessageRecord() { - MessageRecord ret = new MessageRecord(); - ret.setEncoding(TxAck.ENCODING); - ret.setKey(storeTracking); - ret.setSize(MEM_SIZE); - ret.setBuffer(new Buffer(toBytes().getData())); - return null; - } - - private final Buffer toBytes() { - AsciiBuffer queueName = queue.getDescriptor().getQueueName(); - DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8); - try { - baos.writeShort(queueName.length); - baos.write(queueName.data, queueName.offset, queueName.length); - baos.writeLong(queueSequence); - } catch (IOException shouldNotHappen) { - throw new RuntimeException(shouldNotHappen); - } - return baos.toBuffer(); - } - - private final void fromBytes(byte[] bytes) { - DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes); - byte[] queueBytes = new byte[baos.readShort()]; - baos.readFully(queueBytes); - AsciiBuffer queueName = new AsciiBuffer(queueBytes); - queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName); - queueSequence = baos.readLong(); - - } - - public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) { - TxAck ret = new TxAck(null, -1, record.getKey(), tx); - ret.fromBytes(record.getBuffer().getData()); - return ret; - } - - public final boolean isPersistent() { - //TODO This could probably be relaxed when the ack is for non persistent - //elements - return true; - } - - public final Long getExpiration() { - return -1L; - } - - public final int getPriority() { - return 0; - } - } - - /** - * @param record - * @return - */ - public static TxOp createTxOp(MessageRecord record, Transaction tx) { - if (record.getEncoding().equals(TxAck.ENCODING)) { - return TxAck.createFromMessageRecord(record, tx); - } else { - MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor()); - return new TxMessage(delivery, tx); - } - } - - protected void startTransactionProcessor() - { - synchronized(this) - { - if(processor == null) - { - processor = new TxProcessor(); - opQueue.addSubscription(processor); - } - } - } - - - /** - * TxProcessor - *

- * Description: The tx processor processes the transaction queue after - * commit or rollback. - *

- * - * @author cmacnaug - * @version 1.0 - */ - private class TxProcessor implements Subscription { - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#add(java.lang.Object, - * org.apache.activemq.flow.ISourceController, - * org.apache.activemq.queue.Subscription.SubscriptionDelivery) - */ - public void add(TxOp element, ISourceController controller, SubscriptionDelivery callback) { - - switch (state) { - case COMMITED_STATE: { - element.onCommit(controller); - if (callback != null) { - callback.acknowledge(); - } - break; - } - case ROLLBACK_STATE: { - element.onRollback(controller); - if (callback != null) { - callback.acknowledge(); - } - break; - } - default: { - LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state); - } - } - - //If we've reached the end of the op queue - if (opQueue.getEnqueuedCount() == 0) { - opQueue.shutdown(null); - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#hasSelector() - */ - public boolean hasSelector() { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#isBrowser() - */ - public boolean isBrowser() { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#isExclusive() - */ - public boolean isExclusive() { - return true; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang - * .Object) - */ - public boolean isRemoveOnDispatch(TxOp elem) { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object) - */ - public boolean matches(TxOp elem) { - return true; - } - - /* - * (non-Javadoc) - * - * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, - * org.apache.activemq.flow.ISourceController, - * org.apache.activemq.queue.Subscription.SubscriptionDelivery) - */ - public boolean offer(TxOp element, ISourceController controller, SubscriptionDelivery callback) { - add(element, controller, callback); - return true; - } - - } +// TODO: +// private static final Log LOG = LogFactory.getLog(Transaction.class); +// +// public static final byte START_STATE = 0; // can go to: 1,2,3 +// public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4 +// public static final byte PREPARED_STATE = 2; // can go to: 3, 4 +// public static final byte COMMITED_STATE = 3; +// public static final byte ROLLBACK_STATE = 4; +// +// static final byte TYPE_LOCAL = 0; +// static final byte TYPE_XA = 1; +// +// private byte state = START_STATE; +// private final TransactionManager manager; +// private final long tid; +// private final IQueue opQueue; +// protected HashSet listeners; +// +// private TxProcessor processor; +// +// Transaction(TransactionManager manager, long tid, IQueue opQueue) { +// this.manager = manager; +// this.opQueue = opQueue; +// opQueue.start(); +// this.tid = tid; +// } +// +// /** +// * @return the unique identifier used by the {@link TransactionManager} to +// * identify this {@link Transaction} +// * +// */ +// public long getTid() { +// return tid; +// } +// +// public AsciiBuffer getBackingQueueName() { +// return opQueue.getDescriptor().getQueueName(); +// } +// +// /** +// * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL} +// */ +// public abstract byte getType(); +// +// public void addMessage(MessageDelivery m, ISourceController source) { +// +// synchronized (this) { +// switch (state) { +// case START_STATE: +// case IN_USE_STATE: +// opQueue.add(new TxMessage(m, this), source); +// break; +// default: { +// throw new IllegalStateException("Can't add message to finished or prepared transaction"); +// } +// } +// } +// } +// +// public void addAck(SubscriptionDelivery toAck) { +// synchronized (this) { +// switch (state) { +// case START_STATE: +// case IN_USE_STATE: +// IQueue target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName()); +// //Queue could be null if it was just deleted: +// if (target != null) { +// long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking(); +// opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null); +// } +// break; +// default: { +// throw new IllegalStateException("Can't add message to finished or prepared transaction"); +// } +// } +// } +// } +// +// public byte getState() { +// return state; +// } +// +// public void setState(byte state, FutureListener listener) { +// this.state = state; +// ListenableFuture future = manager.persistTransaction(this); +// future.setFutureListener(listener); +// } +// +// public void prePrepare() throws Exception { +// +// // Is it ok to call prepare now given the state of the +// // transaction? +// switch (state) { +// case START_STATE: +// case IN_USE_STATE: +// break; +// default: +// XAException xae = new XAException("Prepare cannot be called now."); +// xae.errorCode = XAException.XAER_PROTO; +// throw xae; +// } +// } +// +// protected void fireAfterCommit() throws Exception { +// +// synchronized (this) { +// for (TransactionListener listener : listeners) { +// listener.onCommit(this); +// } +// } +// } +// +// public void fireAfterRollback() throws Exception { +// synchronized (this) { +// for (TransactionListener listener : listeners) { +// listener.onRollback(this); +// } +// } +// } +// +// public void fireAfterPrepare() throws Exception { +// synchronized (this) { +// for (TransactionListener listener : listeners) { +// listener.onPrepared(this); +// } +// } +// } +// +// public String toString() { +// return super.toString() + "[queue=" + opQueue + "]"; +// } +// +// public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException; +// +// public abstract void rollback(TransactionListener listener) throws XAException, IOException; +// +// public abstract int prepare(TransactionListener listener) throws XAException, IOException; +// +// public boolean isPrepared() { +// return getState() == PREPARED_STATE; +// } +// +// public long size() { +// return opQueue.getEnqueuedCount(); +// } +// +// public static abstract class TransactionListener { +// public void onRollback(Transaction t) { +// +// } +// +// public void onCommit(Transaction t) { +// +// } +// +// public void onPrepared(Transaction t) { +// +// } +// } +// +// interface TxOp { +// public static final short TYPE_MESSAGE = 0; +// public static final short TYPE_ACK = 1; +// +// public short getType(); +// +// public T asType(Class type); +// +// public void onRollback(ISourceController controller); +// +// public void onCommit(ISourceController controller); +// +// public int getLimiterSize(); +// +// public boolean isFromStore(); +// +// public long getStoreTracking(); +// +// public MessageRecord createMessageRecord(); +// +// /** +// * @return +// */ +// public boolean isPersistent(); +// +// /** +// * @return +// */ +// public Long getExpiration(); +// +// public int getPriority(); +// } +// +// static class TxMessage implements TxOp { +// MessageDelivery message; +// Transaction tx; +// private boolean fromStore; +// +// /** +// * @param m +// * @param transaction +// */ +// public TxMessage(MessageDelivery m, Transaction tx) { +// message = m; +// this.tx = tx; +// } +// +// public T asType(Class type) { +// if (type == TxMessage.class) { +// return type.cast(this); +// } else { +// return null; +// } +// } +// +// public final short getType() { +// return TYPE_MESSAGE; +// } +// +// public final int getLimiterSize() { +// return message.getFlowLimiterSize(); +// } +// +// public final void onCommit(ISourceController controller) { +// message.clearTransactionId(); +// tx.manager.getVirtualHost().getRouter().route(message, controller, true); +// } +// +// public final void onRollback(ISourceController controller) { +// //Nothing to do here, message just gets dropped: +// return; +// } +// +// public final boolean isFromStore() { +// return fromStore; +// } +// +// public final MessageRecord createMessageRecord() { +// return message.createMessageRecord(); +// } +// +// public final long getStoreTracking() { +// return message.getStoreTracking(); +// } +// +// public final boolean isPersistent() { +// return message.isPersistent(); +// } +// +// public final Long getExpiration() { +// return message.getExpiration(); +// } +// +// public final int getPriority() { +// return message.getPriority(); +// } +// } +// +// static class TxAck implements TxOp { +// public static AsciiBuffer ENCODING = new AsciiBuffer("txack"); +// Transaction tx; +// IQueue queue; //Desriptor of the queue on which to delete. +// long queueSequence; //Sequence number of the element on the queue from which to delete. +// final long storeTracking; //Store tracking of this delete op. +// private boolean fromStore; +// private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1; +// +// TxAck(IQueue queue, long removalKey, long storeTracking, Transaction tx) { +// this.queue = queue; +// this.queueSequence = removalKey; +// this.tx = tx; +// this.storeTracking = storeTracking; +// } +// +// public final short getType() { +// return TYPE_ACK; +// } +// +// public T asType(Class type) { +// if (type == TxAck.class) { +// return type.cast(this); +// } else { +// return null; +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit() +// */ +// public final void onCommit(ISourceController controller) { +// queue.remove(queueSequence); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback() +// */ +// public final void onRollback(ISourceController controller) { +// //No-Op for now, it is possible that we'd want to unaquire these +// //in the queue if the client weren't to keep these +// //around +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize() +// */ +// public final int getLimiterSize() { +// return MEM_SIZE; +// } +// +// public final boolean isFromStore() { +// return fromStore; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking() +// */ +// public final long getStoreTracking() { +// return storeTracking; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord +// * () +// */ +// public final MessageRecord createMessageRecord() { +// MessageRecord ret = new MessageRecord(); +// ret.setEncoding(TxAck.ENCODING); +// ret.setKey(storeTracking); +// ret.setSize(MEM_SIZE); +// ret.setBuffer(new Buffer(toBytes().getData())); +// return null; +// } +// +// private final Buffer toBytes() { +// AsciiBuffer queueName = queue.getDescriptor().getQueueName(); +// DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8); +// try { +// baos.writeShort(queueName.length); +// baos.write(queueName.data, queueName.offset, queueName.length); +// baos.writeLong(queueSequence); +// } catch (IOException shouldNotHappen) { +// throw new RuntimeException(shouldNotHappen); +// } +// return baos.toBuffer(); +// } +// +// private final void fromBytes(byte[] bytes) { +// DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes); +// byte[] queueBytes = new byte[baos.readShort()]; +// baos.readFully(queueBytes); +// AsciiBuffer queueName = new AsciiBuffer(queueBytes); +// queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName); +// queueSequence = baos.readLong(); +// +// } +// +// public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) { +// TxAck ret = new TxAck(null, -1, record.getKey(), tx); +// ret.fromBytes(record.getBuffer().getData()); +// return ret; +// } +// +// public final boolean isPersistent() { +// //TODO This could probably be relaxed when the ack is for non persistent +// //elements +// return true; +// } +// +// public final Long getExpiration() { +// return -1L; +// } +// +// public final int getPriority() { +// return 0; +// } +// } +// +// /** +// * @param record +// * @return +// */ +// public static TxOp createTxOp(MessageRecord record, Transaction tx) { +// if (record.getEncoding().equals(TxAck.ENCODING)) { +// return TxAck.createFromMessageRecord(record, tx); +// } else { +// MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor()); +// return new TxMessage(delivery, tx); +// } +// } +// +// protected void startTransactionProcessor() +// { +// synchronized(this) +// { +// if(processor == null) +// { +// processor = new TxProcessor(); +// opQueue.addSubscription(processor); +// } +// } +// } +// +// +// /** +// * TxProcessor +// *

+// * Description: The tx processor processes the transaction queue after +// * commit or rollback. +// *

+// * +// * @author cmacnaug +// * @version 1.0 +// */ +// private class TxProcessor implements Subscription { +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#add(java.lang.Object, +// * org.apache.activemq.flow.ISourceController, +// * org.apache.activemq.queue.Subscription.SubscriptionDelivery) +// */ +// public void add(TxOp element, ISourceController controller, SubscriptionDelivery callback) { +// +// switch (state) { +// case COMMITED_STATE: { +// element.onCommit(controller); +// if (callback != null) { +// callback.acknowledge(); +// } +// break; +// } +// case ROLLBACK_STATE: { +// element.onRollback(controller); +// if (callback != null) { +// callback.acknowledge(); +// } +// break; +// } +// default: { +// LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state); +// } +// } +// +// //If we've reached the end of the op queue +// if (opQueue.getEnqueuedCount() == 0) { +// opQueue.shutdown(null); +// } +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#hasSelector() +// */ +// public boolean hasSelector() { +// return false; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#isBrowser() +// */ +// public boolean isBrowser() { +// return false; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#isExclusive() +// */ +// public boolean isExclusive() { +// return true; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang +// * .Object) +// */ +// public boolean isRemoveOnDispatch(TxOp elem) { +// return false; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object) +// */ +// public boolean matches(TxOp elem) { +// return true; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, +// * org.apache.activemq.flow.ISourceController, +// * org.apache.activemq.queue.Subscription.SubscriptionDelivery) +// */ +// public boolean offer(TxOp element, ISourceController controller, SubscriptionDelivery callback) { +// add(element, controller, callback); +// return true; +// } +// +// } } Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=961067&r1=961066&r2=961067&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Wed Jul 7 03:39:03 2010 @@ -22,19 +22,9 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext; -import org.apache.activemq.apollo.broker.Transaction.TxOp; import org.apache.activemq.broker.store.QueueDescriptor; import org.apache.activemq.broker.store.Store.MessageRecord; import org.apache.activemq.broker.store.Store.QueueQueryResult; -import org.apache.activemq.flow.ISourceController; -import org.apache.activemq.flow.SizeLimiter; -import org.apache.activemq.queue.ExclusivePersistentQueue; -import org.apache.activemq.queue.IQueue; -import org.apache.activemq.queue.PersistencePolicy; -import org.apache.activemq.queue.QueueStore; -import org.apache.activemq.queue.RestoreListener; -import org.apache.activemq.queue.SaveableQueueElement; import org.apache.activemq.util.ListenableFuture; import org.apache.activemq.util.Mapper; import org.apache.activemq.util.buffer.AsciiBuffer; @@ -53,362 +43,368 @@ import org.apache.commons.logging.LogFac * @version 1.0 */ public class TransactionManager { - private static final Log LOG = LogFactory.getLog(TransactionManager.class); - private static final String TX_QUEUE_PREFIX = "TX-"; - private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP"); +// TODO: +// private static final Log LOG = LogFactory.getLog(TransactionManager.class); +// private static final String TX_QUEUE_PREFIX = "TX-"; +// private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP"); +// +// private final HashMap transactions = new HashMap(); +// private final HashMap transactionsByQueue = new HashMap(); +// private final HashMap xaTransactions = new HashMap(); +// +// private final VirtualHost host; +// private BrokerDatabase database; +// +// private final AtomicLong tidGen = new AtomicLong(0); +// private final TransactionStore txStore; +// +// private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64; +// private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1; +// // Be default we don't page out elements to disk. +// private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD; +// //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE; +// +// private static final PersistencePolicy DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy() { +// +// private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD; +// +// public boolean isPersistent(TxOp elem) { +// return elem.isPersistent(); +// } +// +// public boolean isPageOutPlaceHolders() { +// return false; +// } +// +// public boolean isPagingEnabled() { +// return PAGING_ENABLED; +// } +// +// public int getPagingInMemorySize() { +// return DEFAULT_TX_QUEUE_PAGING_THRESHOLD; +// } +// +// public boolean isThrottleSourcesToMemoryLimit() { +// // Keep the queue in memory. +// return true; +// } +// +// public int getDisconnectedThrottleRate() { +// // By default don't throttle consumers when disconnected. +// return 0; +// } +// +// public int getRecoveryBias() { +// return 8; +// } +// }; +// +// private static final Mapper EXPIRATION_MAPPER = new Mapper() { +// public Long map(TxOp element) { +// return element.getExpiration(); +// } +// }; +// +// private static final Mapper SIZE_MAPPER = new Mapper() { +// public Integer map(TxOp element) { +// return element.getLimiterSize(); +// } +// }; +// +// private static final Mapper PRIORITY_MAPPER = new Mapper() { +// public Integer map(TxOp element) { +// return element.getPriority(); +// } +// }; +// +// private static final Mapper KEY_MAPPER = new Mapper() { +// public Long map(TxOp element) { +// return element.getStoreTracking(); +// } +// }; +// +// private static final Mapper PARTITION_MAPPER = new Mapper() { +// public Integer map(TxOp element) { +// return 1; +// } +// }; +// +// TransactionManager(VirtualHost host) { +// this.host = host; +// txStore = new TransactionStore(host.getDatabase()); +// database = host.getDatabase(); +// } +// +// /** +// * @return The TM's virtual host +// */ +// public final VirtualHost getVirtualHost() { +// return host; +// } +// +// /** +// * @param msg +// * @param controller +// */ +// public void newMessage(MessageDelivery msg, ISourceController controller) { +// if (msg.getStoreTracking() == -1) { +// msg.setStoreTracking(host.getDatabase().allocateStoreTracking()); +// } +// transactions.get(msg.getTransactionId()).addMessage(msg, controller); +// } +// +// /** +// * Creates a transaction. +// * +// * @param xid +// * @return +// */ +// public synchronized final Transaction createTransaction(Buffer xid) { +// Transaction ret; +// +// long tid = tidGen.incrementAndGet(); +// IQueue opQueue = createTransactionQueue(tid); +// +// if (xid == null) { +// ret = new LocalTransaction(this, tid, opQueue); +// } else { +// XATransaction xat = new XATransaction(this, tid, xid, opQueue); +// ret = xat; +// xaTransactions.put(xid, xat); +// } +// +// transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret); +// transactions.put(ret.getTid(), ret); +// +// return ret; +// } +// +// /** +// * @param buffer +// * @return +// */ +// public synchronized Transaction getXATransaction(Buffer buffer) { +// return xaTransactions.get(buffer); +// } +// +// /** +// * +// * @throws Exception +// */ +// public synchronized void loadTransactions() throws Exception { +// +// tidGen.set(database.allocateStoreTracking()); +// +// Map txns = database.listMapEntries(TXN_MAP); +// +// // Load shared queues +// Iterator results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE); +// while (results.hasNext()) { +// QueueQueryResult loaded = results.next(); +// +// Buffer b = txns.remove(loaded.getDescriptor().getQueueName()); +// if (b == null) { +// LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount()); +// database.deleteQueue(loaded.getDescriptor()); +// } +// +// IQueue queue = createRestoredTxQueue(loaded); +// Transaction tx = loadTransaction(b, queue); +// +// //TODO if we recover a tx that isn't committed then, we should discard it. +// if (tx.getState() < Transaction.COMMITED_STATE) { +// LOG.warn("Recovered unfinished transaction: " + tx); +// } +// transactions.put(tx.getTid(), tx); +// if (tx instanceof XATransaction) { +// XATransaction xat = XATransaction.class.cast(tx); +// xaTransactions.put(xat.getXid(), xat); +// } +// +// LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize()); +// } +// +// if (!txns.isEmpty()) { +// //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be +// //deleted: +// LOG.warn("Recovered transactions without backing queues: " + txns.keySet()); +// } +// } +// +// private Transaction loadTransaction(Buffer b, IQueue queue) throws IOException { +// //TODO move the serialization into the transaction itself: +// DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData()); +// byte type = bais.readByte(); +// byte state = bais.readByte(); +// long tid = bais.readLong(); +// +// Transaction tx = null; +// switch (type) { +// case Transaction.TYPE_LOCAL: +// tx = new LocalTransaction(this, tid, queue); +// break; +// case Transaction.TYPE_XA: +// int length = bais.readByte() & 0xFF; +// Buffer xid = new Buffer(new byte[length]); +// bais.readFully(xid.data); +// tx = new XATransaction(this, tid, xid, queue); +// break; +// default: +// throw new IOException("Invalid transaction type: " + type); +// +// } +// tx.setState(state, null); +// return tx; +// +// } +// +// public ListenableFuture persistTransaction(Transaction tx) { +// +// //TODO move the serialization into the transaction itself: +// DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); +// try { +// baos.writeByte(tx.getType()); +// baos.writeByte(tx.getState()); +// baos.writeLong(tx.getTid()); +// if (tx.getType() == Transaction.TYPE_XA) { +// Buffer xid = ((XATransaction) tx).getXid(); +// // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it. +// baos.writeByte(xid.length & 0xFF); +// baos.write(xid.data, xid.offset, xid.length); +// } +// OperationContext ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size())); +// ctx.requestFlush(); +// return ctx; +// } catch (IOException ioe) { +// //Shouldn't happen +// throw new RuntimeException(ioe); +// } +// } +// +// private IQueue createRestoredTxQueue(QueueQueryResult loaded) throws IOException { +// +// IQueue queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType()); +// queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize()); +// return queue; +// } +// +// private final IQueue createTransactionQueue(long tid) { +// IQueue queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE); +// queue.initialize(0, 0, 0, 0); +// txStore.addQueue(queue.getDescriptor()); +// return queue; +// } +// +// private IQueue createTxQueueInternal(final String name, short type) { +// ExclusivePersistentQueue queue; +// +// SizeLimiter limiter = new SizeLimiter(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) { +// @Override +// public int getElementSize(TxOp elem) { +// return elem.getLimiterSize(); +// } +// }; +// queue = new ExclusivePersistentQueue(name, limiter); +// queue.setStore(txStore); +// queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY); +// queue.setExpirationMapper(EXPIRATION_MAPPER); +// queue.getDescriptor().setApplicationType(type); +// return queue; +// } +// +// final QueueStore getTxnStore() { +// return txStore; +// } +// +// private class TransactionStore implements QueueStore { +// private final BrokerDatabase database; +// +// private final BrokerDatabase.MessageRecordMarshaller TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller() { +// public MessageRecord marshal(TxOp element) { +// return element.createMessageRecord(); +// } +// +// public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) { +// Transaction t = transactionsByQueue.get(queue.getQueueName()); +// return Transaction.createTxOp(record, t); +// } +// }; +// +// TransactionStore(BrokerDatabase database) { +// this.database = database; +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq +// * .queue.QueueDescriptor) +// */ +// public void addQueue(QueueDescriptor queue) { +// database.addQueue(queue); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq +// * .queue.QueueDescriptor) +// */ +// public void deleteQueue(QueueDescriptor queue) { +// database.deleteQueue(queue); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache +// * .activemq.queue.QueueDescriptor, java.lang.Object) +// */ +// public void deleteQueueElement(SaveableQueueElement sqe) { +// database.deleteQueueElement(sqe); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object) +// */ +// public boolean isFromStore(TxOp elem) { +// return elem.isFromStore(); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache +// * .activemq.queue.SaveableQueueElement, +// * org.apache.activemq.flow.ISourceController, boolean) +// */ +// public void persistQueueElement(SaveableQueueElement sqe, ISourceController source, boolean delayable) { +// database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER); +// } +// +// /* +// * (non-Javadoc) +// * +// * @see +// * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache +// * .activemq.queue.QueueDescriptor, boolean, long, long, int, +// * org.apache.activemq.queue.RestoreListener) +// */ +// public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener listener) { +// database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER); +// } +// } - private final HashMap transactions = new HashMap(); - private final HashMap transactionsByQueue = new HashMap(); - private final HashMap xaTransactions = new HashMap(); - - private final VirtualHost host; - private BrokerDatabase database; - - private final AtomicLong tidGen = new AtomicLong(0); - private final TransactionStore txStore; - - private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64; - private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1; - // Be default we don't page out elements to disk. - private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD; - //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE; - - private static final PersistencePolicy DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy() { - - private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD; - - public boolean isPersistent(TxOp elem) { - return elem.isPersistent(); - } - - public boolean isPageOutPlaceHolders() { - return false; - } - - public boolean isPagingEnabled() { - return PAGING_ENABLED; - } - - public int getPagingInMemorySize() { - return DEFAULT_TX_QUEUE_PAGING_THRESHOLD; - } - - public boolean isThrottleSourcesToMemoryLimit() { - // Keep the queue in memory. - return true; - } - - public int getDisconnectedThrottleRate() { - // By default don't throttle consumers when disconnected. - return 0; - } - - public int getRecoveryBias() { - return 8; - } - }; - - private static final Mapper EXPIRATION_MAPPER = new Mapper() { - public Long map(TxOp element) { - return element.getExpiration(); - } - }; - - private static final Mapper SIZE_MAPPER = new Mapper() { - public Integer map(TxOp element) { - return element.getLimiterSize(); - } - }; - - private static final Mapper PRIORITY_MAPPER = new Mapper() { - public Integer map(TxOp element) { - return element.getPriority(); - } - }; - - private static final Mapper KEY_MAPPER = new Mapper() { - public Long map(TxOp element) { - return element.getStoreTracking(); - } - }; - - private static final Mapper PARTITION_MAPPER = new Mapper() { - public Integer map(TxOp element) { - return 1; - } - }; - - TransactionManager(VirtualHost host) { - this.host = host; - txStore = new TransactionStore(host.getDatabase()); - database = host.getDatabase(); - } - - /** - * @return The TM's virtual host - */ - public final VirtualHost getVirtualHost() { - return host; - } - - /** - * @param msg - * @param controller - */ - public void newMessage(MessageDelivery msg, ISourceController controller) { - if (msg.getStoreTracking() == -1) { - msg.setStoreTracking(host.getDatabase().allocateStoreTracking()); - } - transactions.get(msg.getTransactionId()).addMessage(msg, controller); - } - - /** - * Creates a transaction. - * - * @param xid - * @return - */ - public synchronized final Transaction createTransaction(Buffer xid) { - Transaction ret; - - long tid = tidGen.incrementAndGet(); - IQueue opQueue = createTransactionQueue(tid); - - if (xid == null) { - ret = new LocalTransaction(this, tid, opQueue); - } else { - XATransaction xat = new XATransaction(this, tid, xid, opQueue); - ret = xat; - xaTransactions.put(xid, xat); - } - - transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret); - transactions.put(ret.getTid(), ret); - - return ret; - } - - /** - * @param buffer - * @return - */ - public synchronized Transaction getXATransaction(Buffer buffer) { - return xaTransactions.get(buffer); - } - - /** - * - * @throws Exception - */ - public synchronized void loadTransactions() throws Exception { - - tidGen.set(database.allocateStoreTracking()); - - Map txns = database.listMapEntries(TXN_MAP); - - // Load shared queues - Iterator results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE); - while (results.hasNext()) { - QueueQueryResult loaded = results.next(); - - Buffer b = txns.remove(loaded.getDescriptor().getQueueName()); - if (b == null) { - LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount()); - database.deleteQueue(loaded.getDescriptor()); - } - - IQueue queue = createRestoredTxQueue(loaded); - Transaction tx = loadTransaction(b, queue); - - //TODO if we recover a tx that isn't committed then, we should discard it. - if (tx.getState() < Transaction.COMMITED_STATE) { - LOG.warn("Recovered unfinished transaction: " + tx); - } - transactions.put(tx.getTid(), tx); - if (tx instanceof XATransaction) { - XATransaction xat = XATransaction.class.cast(tx); - xaTransactions.put(xat.getXid(), xat); - } - - LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize()); - } - - if (!txns.isEmpty()) { - //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be - //deleted: - LOG.warn("Recovered transactions without backing queues: " + txns.keySet()); - } - } - - private Transaction loadTransaction(Buffer b, IQueue queue) throws IOException { - //TODO move the serialization into the transaction itself: - DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData()); - byte type = bais.readByte(); - byte state = bais.readByte(); - long tid = bais.readLong(); - - Transaction tx = null; - switch (type) { - case Transaction.TYPE_LOCAL: - tx = new LocalTransaction(this, tid, queue); - break; - case Transaction.TYPE_XA: - int length = bais.readByte() & 0xFF; - Buffer xid = new Buffer(new byte[length]); - bais.readFully(xid.data); - tx = new XATransaction(this, tid, xid, queue); - break; - default: - throw new IOException("Invalid transaction type: " + type); - - } - tx.setState(state, null); - return tx; - - } - - public ListenableFuture persistTransaction(Transaction tx) { - - //TODO move the serialization into the transaction itself: - DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); - try { - baos.writeByte(tx.getType()); - baos.writeByte(tx.getState()); - baos.writeLong(tx.getTid()); - if (tx.getType() == Transaction.TYPE_XA) { - Buffer xid = ((XATransaction) tx).getXid(); - // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it. - baos.writeByte(xid.length & 0xFF); - baos.write(xid.data, xid.offset, xid.length); - } - OperationContext ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size())); - ctx.requestFlush(); - return ctx; - } catch (IOException ioe) { - //Shouldn't happen - throw new RuntimeException(ioe); - } - } - - private IQueue createRestoredTxQueue(QueueQueryResult loaded) throws IOException { - - IQueue queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType()); - queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize()); - return queue; + public TransactionManager(VirtualHost virtualHost) { } - private final IQueue createTransactionQueue(long tid) { - IQueue queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE); - queue.initialize(0, 0, 0, 0); - txStore.addQueue(queue.getDescriptor()); - return queue; + public void loadTransactions() { } - - private IQueue createTxQueueInternal(final String name, short type) { - ExclusivePersistentQueue queue; - - SizeLimiter limiter = new SizeLimiter(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) { - @Override - public int getElementSize(TxOp elem) { - return elem.getLimiterSize(); - } - }; - queue = new ExclusivePersistentQueue(name, limiter); - queue.setStore(txStore); - queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY); - queue.setExpirationMapper(EXPIRATION_MAPPER); - queue.getDescriptor().setApplicationType(type); - return queue; - } - - final QueueStore getTxnStore() { - return txStore; - } - - private class TransactionStore implements QueueStore { - private final BrokerDatabase database; - - private final BrokerDatabase.MessageRecordMarshaller TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller() { - public MessageRecord marshal(TxOp element) { - return element.createMessageRecord(); - } - - public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) { - Transaction t = transactionsByQueue.get(queue.getQueueName()); - return Transaction.createTxOp(record, t); - } - }; - - TransactionStore(BrokerDatabase database) { - this.database = database; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq - * .queue.QueueDescriptor) - */ - public void addQueue(QueueDescriptor queue) { - database.addQueue(queue); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq - * .queue.QueueDescriptor) - */ - public void deleteQueue(QueueDescriptor queue) { - database.deleteQueue(queue); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache - * .activemq.queue.QueueDescriptor, java.lang.Object) - */ - public void deleteQueueElement(SaveableQueueElement sqe) { - database.deleteQueueElement(sqe); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object) - */ - public boolean isFromStore(TxOp elem) { - return elem.isFromStore(); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache - * .activemq.queue.SaveableQueueElement, - * org.apache.activemq.flow.ISourceController, boolean) - */ - public void persistQueueElement(SaveableQueueElement sqe, ISourceController source, boolean delayable) { - database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache - * .activemq.queue.QueueDescriptor, boolean, long, long, int, - * org.apache.activemq.queue.RestoreListener) - */ - public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener listener) { - database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER); - } - } - }