activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [31/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:01 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java
new file mode 100644
index 0000000..0eb84a2
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscription.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+public interface AMQSubscription extends AMQSubscriptionRecovery
+{
+
+   /**
+    * Used to add messages that match the subscription.
+    * @param node
+    * @throws Exception
+    * @throws InterruptedException
+    * @throws IOException
+    */
+   void add(MessageReference node) throws Exception;
+
+   /**
+    * Used when client acknowledge receipt of dispatched message.
+    * @throws IOException
+    * @throws Exception
+    */
+   void acknowledge(AMQConnectionContext context, final MessageAck ack) throws Exception;
+
+   /**
+    * Allows a consumer to pull a message on demand
+    */
+   Response pullMessage(AMQConnectionContext context, MessagePull pull) throws Exception;
+
+   /**
+    * Returns true if this subscription is a Wildcard subscription.
+    * @return true if wildcard subscription.
+    */
+   boolean isWildcard();
+
+   /**
+    * Is the subscription interested in the message?
+    * @param node
+    * @param context
+    * @return
+    * @throws IOException
+    */
+   boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException;
+
+   /**
+    * Is the subscription interested in messages in the destination?
+    * @param destination
+    * @return
+    */
+   boolean matches(ActiveMQDestination destination);
+
+   /**
+    * The subscription will be receiving messages from the destination.
+    * @param context
+    * @param destination
+    * @throws Exception
+    */
+   void add(AMQConnectionContext context, AMQDestination destination) throws Exception;
+
+   /**
+    * The subscription will be no longer be receiving messages from the destination.
+    * @param context
+    * @param destination
+    * @return a list of un-acked messages that were added to the subscription.
+    */
+   List<MessageReference> remove(AMQConnectionContext context, AMQDestination destination) throws Exception;
+
+   /**
+    * The ConsumerInfo object that created the subscription.
+    */
+   ConsumerInfo getConsumerInfo();
+
+   /**
+    * The subscription should release as may references as it can to help the garbage collector
+    * reclaim memory.
+    */
+   void gc();
+
+   /**
+    * Used by a Slave Broker to update dispatch infomation
+    * @param mdn
+    * @throws Exception
+    */
+   void processMessageDispatchNotification(MessageDispatchNotification  mdn) throws Exception;
+
+   /**
+    * @return number of messages pending delivery
+    */
+   int getPendingQueueSize();
+
+   /**
+    * @return number of messages dispatched to the client
+    */
+   int getDispatchedQueueSize();
+
+   /**
+    * @return number of messages dispatched to the client
+    */
+   long getDispatchedCounter();
+
+   /**
+    * @return number of messages that matched the subscription
+    */
+   long getEnqueueCounter();
+
+   /**
+    * @return number of messages queued by the client
+    */
+   long getDequeueCounter();
+
+   /**
+    * @return the JMS selector on the current subscription
+    */
+   String getSelector();
+
+   /**
+    * Attempts to change the current active selector on the subscription.
+    * This operation is not supported for persistent topics.
+    */
+   void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException;
+
+   /**
+    * @return the JMX object name that this subscription was registered as if applicable
+    */
+   ObjectName getObjectName();
+
+   /**
+    * Set when the subscription is registered in JMX
+    */
+   void setObjectName(ObjectName objectName);
+
+   /**
+    * @return true when 60% or more room is left for dispatching messages
+    */
+   boolean isLowWaterMark();
+
+   /**
+    * @return true when 10% or less room is left for dispatching messages
+    */
+   boolean isHighWaterMark();
+
+   /**
+    * @return true if there is no space to dispatch messages
+    */
+   boolean isFull();
+
+   /**
+    * inform the MessageConsumer on the client to change it's prefetch
+    * @param newPrefetch
+    */
+   void updateConsumerPrefetch(int newPrefetch);
+
+   /**
+    * Called when the subscription is destroyed.
+    */
+   void destroy();
+
+   /**
+    * @return the prefetch size that is configured for the subscription
+    */
+   int getPrefetchSize();
+
+   /**
+    * @return the number of messages awaiting acknowledgement
+    */
+   int getInFlightSize();
+
+   /**
+    * @return the in flight messages as a percentage of the prefetch size
+    */
+   int getInFlightUsage();
+
+   /**
+    * Informs the Broker if the subscription needs to intervention to recover it's state
+    * e.g. DurableTopicSubscriber may do
+    * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
+    * @return true if recovery required
+    */
+   boolean isRecoveryRequired();
+
+   /**
+    * @return true if a browser
+    */
+   boolean isBrowser();
+
+   /**
+    * @return the number of messages this subscription can accept before its full
+    */
+   int countBeforeFull();
+
+   AMQConnectionContext getContext();
+
+   int getCursorMemoryHighWaterMark();
+
+   void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+
+   boolean isSlowConsumer();
+
+   void unmatched(MessageReference node) throws IOException;
+
+   /**
+    * Returns the time since the last Ack message was received by this subscription.
+    *
+    * If there has never been an ack this value should be set to the creation time of the
+    * subscription.
+    *
+    * @return time of last received Ack message or Subscription create time if no Acks.
+    */
+   long getTimeOfLastMessageAck();
+
+   long  getConsumedCount();
+
+   void incrementConsumedCount();
+
+   void resetConsumedCount();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java
new file mode 100644
index 0000000..7cf1e84
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQSubscriptionRecovery.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * An interface for recoverying transient messages held by the broker for
+ * retractive recovery for subscribers
+ *
+ *
+ */
+public interface AMQSubscriptionRecovery
+{
+
+   /**
+    * Add a message to the SubscriptionRecovery
+    *
+    * @param context
+    * @param message
+    * @return true if the message is accepted
+    * @throws Exception
+    */
+   boolean addRecoveredMessage(AMQConnectionContext context, MessageReference message) throws Exception;
+
+   /**
+    * @return the Destination associated with this Subscription
+    */
+   ActiveMQDestination getActiveMQDestination();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java
new file mode 100644
index 0000000..1a477b9
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransaction.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.transaction.Synchronization;
+import org.slf4j.Logger;
+
+public abstract class AMQTransaction
+{
+   public static final byte START_STATE = 0; // can go to: 1,2,3
+   public static final byte IN_USE_STATE = 1; // can go to: 2,3
+   public static final byte PREPARED_STATE = 2; // can go to: 3
+   public static final byte FINISHED_STATE = 3;
+   boolean committed = false;
+
+   private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
+   private byte state = START_STATE;
+   protected FutureTask<?> preCommitTask = new FutureTask<Object>(
+         new Callable<Object>()
+         {
+            public Object call() throws Exception
+            {
+               doPreCommit();
+               return null;
+            }
+         });
+   protected FutureTask<?> postCommitTask = new FutureTask<Object>(
+         new Callable<Object>()
+         {
+            public Object call() throws Exception
+            {
+               doPostCommit();
+               return null;
+            }
+         });
+
+   public byte getState()
+   {
+      return state;
+   }
+
+   public void setState(byte state)
+   {
+      this.state = state;
+   }
+
+   public boolean isCommitted()
+   {
+      return committed;
+   }
+
+   public void setCommitted(boolean committed)
+   {
+      this.committed = committed;
+   }
+
+   public void addSynchronization(Synchronization r)
+   {
+      synchronizations.add(r);
+      if (state == START_STATE)
+      {
+         state = IN_USE_STATE;
+      }
+   }
+
+   public Synchronization findMatching(Synchronization r)
+   {
+      int existing = synchronizations.indexOf(r);
+      if (existing != -1)
+      {
+         return synchronizations.get(existing);
+      }
+      return null;
+   }
+
+   public void removeSynchronization(Synchronization r)
+   {
+      synchronizations.remove(r);
+   }
+
+   public void prePrepare() throws Exception
+   {
+
+      // Is it ok to call prepare now given the state of the
+      // transaction?
+      switch (state)
+      {
+         case START_STATE:
+         case IN_USE_STATE:
+            break;
+         default:
+            XAException xae = new XAException("Prepare cannot be called now.");
+            xae.errorCode = XAException.XAER_PROTO;
+            throw xae;
+      }
+
+      // // Run the prePrepareTasks
+      // for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
+      // Callback r = (Callback) iter.next();
+      // r.execute();
+      // }
+   }
+
+   protected void fireBeforeCommit() throws Exception
+   {
+      for (Iterator<Synchronization> iter = synchronizations.iterator(); iter
+            .hasNext();)
+      {
+         Synchronization s = iter.next();
+         s.beforeCommit();
+      }
+   }
+
+   protected void fireAfterCommit() throws Exception
+   {
+      for (Iterator<Synchronization> iter = synchronizations.iterator(); iter
+            .hasNext();)
+      {
+         Synchronization s = iter.next();
+         s.afterCommit();
+      }
+   }
+
+   public void fireAfterRollback() throws Exception
+   {
+      Collections.reverse(synchronizations);
+      for (Iterator<Synchronization> iter = synchronizations.iterator(); iter
+            .hasNext();)
+      {
+         Synchronization s = iter.next();
+         s.afterRollback();
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return "Local-" + getTransactionId() + "[synchronizations="
+            + synchronizations + "]";
+   }
+
+   public abstract void commit(boolean onePhase) throws XAException,
+         IOException;
+
+   public abstract void rollback() throws XAException, IOException;
+
+   public abstract int prepare() throws XAException, IOException;
+
+   public abstract TransactionId getTransactionId();
+
+   public abstract Logger getLog();
+
+   public boolean isPrepared()
+   {
+      return getState() == PREPARED_STATE;
+   }
+
+   public int size()
+   {
+      return synchronizations.size();
+   }
+
+   protected void waitPostCommitDone(FutureTask<?> postCommitTask) throws XAException, IOException
+   {
+      try
+      {
+         postCommitTask.get();
+      }
+      catch (InterruptedException e)
+      {
+         throw new InterruptedIOException(e.toString());
+      }
+      catch (ExecutionException e)
+      {
+         Throwable t = e.getCause();
+         if (t instanceof XAException)
+         {
+            throw (XAException) t;
+         }
+         else if (t instanceof IOException)
+         {
+            throw (IOException) t;
+         }
+         else
+         {
+            throw new XAException(e.toString());
+         }
+      }
+   }
+
+   protected void doPreCommit() throws XAException
+   {
+      try
+      {
+         fireBeforeCommit();
+      }
+      catch (Throwable e)
+      {
+         // I guess this could happen. Post commit task failed
+         // to execute properly.
+         getLog().warn("PRE COMMIT FAILED: ", e);
+         XAException xae = new XAException("PRE COMMIT FAILED");
+         xae.errorCode = XAException.XAER_RMERR;
+         xae.initCause(e);
+         throw xae;
+      }
+   }
+
+   protected void doPostCommit() throws XAException
+   {
+      try
+      {
+         setCommitted(true);
+         fireAfterCommit();
+      }
+      catch (Throwable e)
+      {
+         // I guess this could happen. Post commit task failed
+         // to execute properly.
+         getLog().warn("POST COMMIT FAILED: ", e);
+         XAException xae = new XAException("POST COMMIT FAILED");
+         xae.errorCode = XAException.XAER_RMERR;
+         xae.initCause(e);
+         throw xae;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java
new file mode 100644
index 0000000..5632a63
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransactionFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.protocol.openwire.AMQTransactionImpl;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionFactory;
+
+public class AMQTransactionFactory implements TransactionFactory
+{
+   @Override
+   public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds)
+   {
+      return new AMQTransactionImpl(xid, storageManager, timeoutSeconds);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java
new file mode 100644
index 0000000..be11e5b
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionState.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.state.ConnectionState;
+import org.hornetq.core.protocol.openwire.OpenWireConnection;
+
+/**
+ * @See org.apache.activemq.broker.TransportConnectionState
+ * @author howard
+ *
+ */
+public class AMQTransportConnectionState extends ConnectionState
+{
+
+   private AMQConnectionContext context;
+   private OpenWireConnection connection;
+   private AtomicInteger referenceCounter = new AtomicInteger();
+   private final Object connectionMutex = new Object();
+
+   public AMQTransportConnectionState(ConnectionInfo info,
+         OpenWireConnection transportConnection)
+   {
+      super(info);
+      connection = transportConnection;
+   }
+
+   public AMQConnectionContext getContext()
+   {
+      return context;
+   }
+
+   public OpenWireConnection getConnection()
+   {
+      return connection;
+   }
+
+   public void setContext(AMQConnectionContext context)
+   {
+      this.context = context;
+   }
+
+   public void setConnection(OpenWireConnection connection)
+   {
+      this.connection = connection;
+   }
+
+   public int incrementReference()
+   {
+      return referenceCounter.incrementAndGet();
+   }
+
+   public int decrementReference()
+   {
+      return referenceCounter.decrementAndGet();
+   }
+
+   public AtomicInteger getReferenceCounter()
+   {
+      return referenceCounter;
+   }
+
+   public void setReferenceCounter(AtomicInteger referenceCounter)
+   {
+      this.referenceCounter = referenceCounter;
+   }
+
+   public Object getConnectionMutex()
+   {
+      return connectionMutex;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
new file mode 100644
index 0000000..d0e68f5
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/AMQTransportConnectionStateRegister.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.SessionId;
+
+/**
+ * What's the purpose of this?
+ *
+ * @author howard
+ *
+ */
+public interface AMQTransportConnectionStateRegister
+{
+   AMQTransportConnectionState registerConnectionState(ConnectionId connectionId,
+         AMQTransportConnectionState state);
+
+   AMQTransportConnectionState unregisterConnectionState(ConnectionId connectionId);
+
+   List<AMQTransportConnectionState> listConnectionStates();
+
+   Map<ConnectionId, AMQTransportConnectionState> mapStates();
+
+   AMQTransportConnectionState lookupConnectionState(String connectionId);
+
+   AMQTransportConnectionState lookupConnectionState(ConsumerId id);
+
+   AMQTransportConnectionState lookupConnectionState(ProducerId id);
+
+   AMQTransportConnectionState lookupConnectionState(SessionId id);
+
+   AMQTransportConnectionState lookupConnectionState(ConnectionId connectionId);
+
+   boolean isEmpty();
+
+   boolean doesHandleMultipleConnectionStates();
+
+   void intialize(AMQTransportConnectionStateRegister other);
+
+   void clear();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java
new file mode 100644
index 0000000..12eaf23
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/BrowserListener.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+interface BrowserListener
+{
+   void browseFinished();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java
new file mode 100644
index 0000000..6315d86
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/amq/MessageInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.openwire.amq;
+
+import org.apache.activemq.command.MessageId;
+
+public class MessageInfo
+{
+   public MessageId amqId;
+   public long nativeId;
+   public int size;
+   //mark message that is acked within a local tx
+   public boolean localAcked;
+
+   public MessageInfo(MessageId amqId, long nativeId, int size)
+   {
+      this.amqId = amqId;
+      this.nativeId = nativeId;
+      this.size = size;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "native mid: " + this.nativeId + " amqId: " + amqId + " local acked: " + localAcked;
+   }
+
+   public void setLocalAcked(boolean ack)
+   {
+      localAcked = ack;
+   }
+
+   public boolean isLocalAcked()
+   {
+      return localAcked;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory b/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
new file mode 100644
index 0000000..fd06ac6
--- /dev/null
+++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
@@ -0,0 +1 @@
+org.hornetq.core.protocol.openwire.OpenWireProtocolManagerFactory

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/pom.xml
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/pom.xml b/hornetq-protocols/hornetq-proton-plug/pom.xml
new file mode 100644
index 0000000..98ece12
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/pom.xml
@@ -0,0 +1,80 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <parent>
+      <artifactId>hornetq-protocols</artifactId>
+      <groupId>org.hornetq</groupId>
+      <version>2.5.0-SNAPSHOT</version>
+   </parent>
+   <modelVersion>4.0.0</modelVersion>
+
+   <artifactId>hornetq-proton-plug</artifactId>
+
+   <properties>
+      <hornetq.basedir>${project.basedir}/../..</hornetq.basedir>
+   </properties>
+
+   <dependencies>
+      <!-- JMS Client because of some Convertions that are done -->
+      <dependency>
+         <groupId>org.hornetq</groupId>
+         <artifactId>hornetq-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.hornetq</groupId>
+         <artifactId>hornetq-core-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+
+      <!--
+          JBoss Logging
+      -->
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.hornetq</groupId>
+         <artifactId>hornetq-server</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>proton-j</artifactId>
+      </dependency>
+
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-client</artifactId>
+         <version>0.24</version>
+         <scope>test</scope>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+         <version>0.24</version>
+         <scope>test</scope>
+      </dependency>
+
+
+
+      <dependency>
+         <groupId>org.jboss.spec.javax.jms</groupId>
+         <artifactId>jboss-jms-api_2.0_spec</artifactId>
+         <scope>provided</scope>
+      </dependency>
+      <dependency>
+         <groupId>junit</groupId>
+         <artifactId>junit</artifactId>
+         <scope>test</scope>
+      </dependency>
+
+
+   </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
new file mode 100644
index 0000000..5678a38
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientConnectionContext.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * This is valid only on a client connection.
+ *
+ * @author Clebert Suconic
+ */
+
+public interface AMQPClientConnectionContext extends AMQPConnectionContext
+{
+   /**
+    * This will send an open and block for its return on AMQP protocol.
+    *
+    * @throws Exception
+    */
+   void clientOpen(ClientSASL sasl) throws Exception;
+
+   AMQPClientSessionContext createClientSession() throws HornetQAMQPException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
new file mode 100644
index 0000000..4e94a73
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientReceiverContext.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPClientReceiverContext
+{
+   ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception;
+
+   void flow(int credits);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
new file mode 100644
index 0000000..1fb5375
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSenderContext.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPClientSenderContext
+{
+   void send(ProtonJMessage message);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
new file mode 100644
index 0000000..45cbaad
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPClientSessionContext.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPClientSessionContext extends AMQPSessionContext
+{
+   AMQPClientSenderContext createSender(String address, boolean preSettled) throws HornetQAMQPException;
+
+   AMQPClientReceiverContext createReceiver(String address) throws HornetQAMQPException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
new file mode 100644
index 0000000..d6a1b5a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPConnectionCallback
+{
+   void close();
+
+   /**
+    * this is called when bytes are available to be sent to the client.
+    * you have to callback {@link org.proton.plug.AMQPConnectionContext#outputDone(int)} after you're done with this buffer
+    *
+    * @param bytes
+    */
+   void onTransport(ByteBuf bytes, AMQPConnectionContext connection);
+
+   AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection);
+
+   void setConnection(AMQPConnectionContext connection);
+
+   AMQPConnectionContext getConnection();
+
+   ServerSASL[] getSASLMechnisms();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
new file mode 100644
index 0000000..4969b10
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPConnectionContext
+{
+
+   void close();
+
+   Object getLock();
+
+   boolean checkDataReceived();
+
+   long getCreationTime();
+
+   SASLResult getSASLResult();
+
+   /**
+    * Even though we are currently always sending packets asynchronsouly
+    * we have a possibility to start trusting on the network flow control
+    * and always sync on the send of the packet
+    *
+    * This is for future use and should be kept returning false.
+    *
+    * We will have to do some testing before we make this return true
+    * @return
+    */
+   boolean isSyncOnFlush();
+
+   int capacity();
+
+   /**
+    * This is for the Remoting layer to push bytes on the AMQP Connection
+    * The buffer readerIndex should be at the latest read byte after this method is called
+    *
+    * @param buffer
+    * @return
+    */
+   void inputBuffer(ByteBuf buffer);
+
+   void flush();
+
+   /**
+    * To be called when the bytes were sent down the stream (flushed on the socket for example)
+    *
+    * @param numberOfBytes
+    */
+   void outputDone(int numberOfBytes);
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
new file mode 100644
index 0000000..4f0b127
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public abstract class AMQPConnectionContextFactory
+{
+   /**
+    * @return
+    */
+   public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
new file mode 100644
index 0000000..62846a8
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPServerConnectionContext extends AMQPConnectionContext
+{
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
new file mode 100644
index 0000000..10933a5
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.context.ProtonPlugSender;
+
+/**
+ * These are methods where the Proton Plug component will call your server
+ *
+ * @author Clebert Suconic
+ */
+
+public interface AMQPSessionCallback
+{
+
+   void init(AMQPSessionContext session, SASLResult saslResult) throws Exception;
+
+   void start();
+
+   void onFlowConsumer(Object consumer, int credits);
+
+   Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception;
+
+   void startSender(Object brokerConsumer) throws Exception;
+
+   void createTemporaryQueue(String queueName) throws Exception;
+
+   boolean queueQuery(String queueName) throws Exception;
+
+   void closeSender(Object brokerConsumer) throws Exception;
+
+   // This one can be a lot improved
+   ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception;
+
+   Binary getCurrentTXID();
+
+   String tempQueueName();
+
+   void commitCurrentTX() throws Exception;
+
+   void rollbackCurrentTX() throws Exception;
+
+   void close() throws Exception;
+
+
+   void ack(Object brokerConsumer, Object message) throws Exception;
+
+   /**
+    * @param brokerConsumer
+    * @param message
+    * @param updateCounts   this identified if the cancel was because of a failure or just cleaning up the
+    *                       client's cache.
+    *                       in some implementations you could call this failed
+    */
+   void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception;
+
+
+   void resumeDelivery(Object consumer);
+
+
+   /**
+    * @param delivery
+    * @param address
+    * @param messageFormat
+    * @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[])
+    */
+   void serverSend(Receiver receiver, Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
new file mode 100644
index 0000000..2cc725d
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface AMQPSessionContext
+{
+   byte[] getTag();
+
+   void replaceTag(byte[] tag);
+
+   void close();
+
+   void removeSender(Sender sender) throws HornetQAMQPException;
+
+   void removeReceiver(Receiver receiver);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
new file mode 100644
index 0000000..ef3af34
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+/**
+ * @author Clebert Suconic
+ */
+public interface ClientSASL
+{
+   byte[] getBytes();
+
+   String getName();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java
new file mode 100644
index 0000000..e4b697e
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/SASLResult.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface SASLResult
+{
+   String getUser();
+
+   boolean isSuccess();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
new file mode 100644
index 0000000..dbadaca
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface ServerSASL
+{
+   String getName();
+
+   SASLResult processSASL(byte[] bytes);
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
new file mode 100644
index 0000000..45a3ef0
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.SASLResult;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.handler.ProtonHandler;
+import org.proton.plug.handler.impl.DefaultEventHandler;
+import org.proton.plug.util.ByteUtil;
+import org.proton.plug.util.DebugInfo;
+
+/**
+ * Clebert Suconic
+ */
+public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext
+{
+
+
+   protected ProtonHandler handler = ProtonHandler.Factory.create();
+
+   protected AMQPConnectionCallback connectionCallback;
+
+   private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
+
+
+   public AbstractConnectionContext(AMQPConnectionCallback connectionCallback)
+   {
+      this.connectionCallback = connectionCallback;
+      connectionCallback.setConnection(this);
+      handler.addEventHandler(new LocalListener());
+   }
+
+   public SASLResult getSASLResult()
+   {
+      return handler.getSASLResult();
+   }
+
+   @Override
+   public void inputBuffer(ByteBuf buffer)
+   {
+      if (DebugInfo.debug)
+      {
+         ByteUtil.debugFrame("Buffer Received ", buffer);
+      }
+
+      handler.inputBuffer(buffer);
+   }
+
+   public void destroy()
+   {
+      connectionCallback.close();
+   }
+
+   /**
+    * See comment at {@link org.proton.plug.AMQPConnectionContext#isSyncOnFlush()}
+    * @return
+    * @See {@link org.proton.plug.AMQPConnectionContext#isSyncOnFlush()}
+    */
+   public boolean isSyncOnFlush()
+   {
+      return false;
+   }
+
+
+   public Object getLock()
+   {
+      return handler.getLock();
+   }
+
+   @Override
+   public int capacity()
+   {
+      return handler.capacity();
+   }
+
+   @Override
+   public void outputDone(int bytes)
+   {
+      handler.outputDone(bytes);
+   }
+
+   public void flush()
+   {
+      handler.flush();
+   }
+
+   public void close()
+   {
+      handler.close();
+   }
+
+   protected AbstractProtonSessionContext getSessionExtension(Session realSession) throws HornetQAMQPException
+   {
+      AbstractProtonSessionContext sessionExtension = sessions.get(realSession);
+      if (sessionExtension == null)
+      {
+         // how this is possible? Log a warn here
+         sessionExtension = newSessionExtension(realSession);
+         realSession.setContext(sessionExtension);
+         sessions.put(realSession, sessionExtension);
+      }
+      return sessionExtension;
+   }
+
+   protected abstract void remoteLinkOpened(Link link) throws Exception;
+
+
+   protected abstract AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException;
+
+   @Override
+   public boolean checkDataReceived()
+   {
+      return handler.checkDataReceived();
+   }
+
+   @Override
+   public long getCreationTime()
+   {
+      return handler.getCreationTime();
+   }
+
+   protected void flushBytes()
+   {
+      ByteBuf bytes;
+      // handler.outputBuffer has the lock
+      while ((bytes = handler.outputBuffer()) != null)
+      {
+         connectionCallback.onTransport(bytes, AbstractConnectionContext.this);
+      }
+   }
+
+
+   // This listener will perform a bunch of things here
+   class LocalListener extends DefaultEventHandler
+   {
+
+      @Override
+      public void onSASLInit(ProtonHandler handler, Connection connection)
+      {
+         handler.createServerSASL(connectionCallback.getSASLMechnisms());
+      }
+
+      @Override
+      public void onTransport(Transport transport)
+      {
+         flushBytes();
+      }
+
+      @Override
+      public void onRemoteOpen(Connection connection) throws Exception
+      {
+         synchronized (getLock())
+         {
+            connection.setContext(AbstractConnectionContext.this);
+            connection.open();
+         }
+         initialise();
+      }
+
+
+      @Override
+      public void onRemoteClose(Connection connection)
+      {
+         synchronized (getLock())
+         {
+            connection.close();
+            for (AbstractProtonSessionContext protonSession : sessions.values())
+            {
+               protonSession.close();
+            }
+            sessions.clear();
+         }
+         // We must force write the channel before we actually destroy the connection
+         onTransport(handler.getTransport());
+         destroy();
+      }
+
+      @Override
+      public void onLocalOpen(Session session) throws Exception
+      {
+         getSessionExtension(session);
+      }
+
+      @Override
+      public void onRemoteOpen(Session session) throws Exception
+      {
+         getSessionExtension(session).initialise();
+         synchronized (getLock())
+         {
+            session.open();
+         }
+      }
+
+
+      @Override
+      public void onLocalClose(Session session) throws Exception
+      {
+      }
+
+      @Override
+      public void onRemoteClose(Session session) throws Exception
+      {
+         synchronized (getLock())
+         {
+            session.close();
+         }
+
+         AbstractProtonSessionContext sessionContext = (AbstractProtonSessionContext) session.getContext();
+         if (sessionContext != null)
+         {
+            sessionContext.close();
+            sessions.remove(session);
+            session.setContext(null);
+         }
+      }
+
+      @Override
+      public void onRemoteOpen(Link link) throws Exception
+      {
+         remoteLinkOpened(link);
+      }
+
+      @Override
+      public void onFlow(Link link) throws Exception
+      {
+         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit());
+      }
+
+      @Override
+      public void onRemoteClose(Link link) throws Exception
+      {
+         link.close();
+         ((ProtonDeliveryHandler) link.getContext()).close();
+      }
+
+
+      public void onRemoteDetach(Link link) throws Exception
+      {
+         link.detach();
+      }
+
+      public void onDelivery(Delivery delivery) throws Exception
+      {
+         ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
+         if (handler != null)
+         {
+            handler.onMessage(delivery);
+         }
+         else
+         {
+            // TODO: logs
+
+            System.err.println("Handler is null, can't delivery " + delivery);
+         }
+      }
+
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
new file mode 100644
index 0000000..45b5250
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.util.CreditsSemaphore;
+import org.proton.plug.util.NettyWritable;
+
+/**
+ * A this is a wrapper around a HornetQ ServerConsumer for handling outgoing messages and incoming acks via a Proton Sender
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public abstract class AbstractProtonContextSender extends ProtonInitializable implements ProtonDeliveryHandler
+{
+   protected final AbstractProtonSessionContext protonSession;
+   protected final Sender sender;
+   protected final AbstractConnectionContext connection;
+   protected boolean closed = false;
+   protected final AMQPSessionCallback sessionSPI;
+   protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
+
+
+   public AbstractProtonContextSender(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server)
+   {
+      this.connection = connection;
+      this.sender = sender;
+      this.protonSession = protonSession;
+      this.sessionSPI = server;
+   }
+
+   public void onFlow(int credits)
+   {
+      this.creditsSemaphore.setCredits(credits);
+   }
+
+   /*
+   * start the session
+   * */
+   public void start() throws HornetQAMQPException
+   {
+      sessionSPI.start();
+      // protonSession.getServerSession().start();
+   }
+
+   /*
+   * close the session
+   * */
+   public void close() throws HornetQAMQPException
+   {
+      closed = true;
+      protonSession.removeSender(sender);
+      synchronized (connection.getLock())
+      {
+         sender.close();
+      }
+
+      connection.flush();
+   }
+
+   @Override
+   /*
+   * handle an incoming Ack from Proton, basically pass to HornetQ to handle
+   * */
+   public abstract void onMessage(Delivery delivery) throws HornetQAMQPException;
+
+   /*
+   * check the state of the consumer, i.e. are there any more messages. only really needed for browsers?
+   * */
+   public void checkState()
+   {
+   }
+
+   public Sender getSender()
+   {
+      return sender;
+   }
+
+   protected int performSend(ProtonJMessage serverMessage, Object context)
+   {
+      if (!creditsSemaphore.tryAcquire())
+      {
+         try
+         {
+            creditsSemaphore.acquire();
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+            // nothing to be done here.. we just keep going
+            throw new IllegalStateException(e.getMessage(), e);
+         }
+      }
+
+      //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
+      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+      //we only need a tag if we are going to ack later
+      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
+
+      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+      try
+      {
+         serverMessage.encode(new NettyWritable(nettyBuffer));
+
+         int size = nettyBuffer.writerIndex();
+
+         synchronized (connection.getLock())
+         {
+            final Delivery delivery;
+            delivery = sender.delivery(tag, 0, tag.length);
+            delivery.setContext(context);
+
+            // this will avoid a copy.. patch provided by Norman using buffer.array()
+            sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
+
+            if (preSettle)
+            {
+               delivery.settle();
+            }
+            else
+            {
+               sender.advance();
+            }
+         }
+
+         connection.flush();
+
+         return size;
+      }
+      finally
+      {
+         nettyBuffer.release();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
new file mode 100644
index 0000000..fb4bb07
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import org.apache.qpid.proton.engine.Receiver;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         <p/>
+ *         handles incoming messages via a Proton Receiver and forwards them to HornetQ
+ */
+public abstract class AbstractProtonReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler
+{
+   protected final AbstractConnectionContext connection;
+
+   protected final AbstractProtonSessionContext protonSession;
+
+   protected final Receiver receiver;
+
+   protected final String address;
+
+   protected final AMQPSessionCallback sessionSPI;
+
+   public AbstractProtonReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver)
+   {
+      this.connection = connection;
+      this.protonSession = protonSession;
+      this.receiver = receiver;
+      if (receiver.getRemoteTarget() != null)
+      {
+         this.address = receiver.getRemoteTarget().getAddress();
+      }
+      else
+      {
+         this.address = null;
+      }
+      this.sessionSPI = sessionSPI;
+   }
+
+   @Override
+   public void close() throws HornetQAMQPException
+   {
+      protonSession.removeReceiver(receiver);
+   }
+
+   public void flow(int credits)
+   {
+      synchronized (connection.getLock())
+      {
+         receiver.flow(credits);
+      }
+      connection.flush();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
new file mode 100644
index 0000000..8f28039
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.AMQPSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.exceptions.HornetQAMQPInternalErrorException;
+
+/**
+ * ProtonSession is a direct representation of the session on the broker.
+ * It has a link between a ProtonSession and a Broker or Client Session
+ * The Broker Session is linked through the ProtonSessionSPI
+ *
+ * @author Clebert Suconic
+ */
+public abstract class AbstractProtonSessionContext extends ProtonInitializable implements AMQPSessionContext
+{
+   protected final AbstractConnectionContext connection;
+
+   protected final AMQPSessionCallback sessionSPI;
+
+   protected final Session session;
+
+   private long currentTag = 0;
+
+   protected Map<Receiver, AbstractProtonReceiverContext> receivers = new HashMap<Receiver, AbstractProtonReceiverContext>();
+
+   protected Map<Sender, AbstractProtonContextSender> senders = new HashMap<Sender, AbstractProtonContextSender>();
+
+   protected boolean closed = false;
+
+   public AbstractProtonSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session)
+   {
+      this.connection = connection;
+      this.sessionSPI = sessionSPI;
+      this.session = session;
+   }
+
+   public void initialise() throws Exception
+   {
+      if (!isInitialized())
+      {
+         super.initialise();
+
+         if (sessionSPI != null)
+         {
+            try
+            {
+               sessionSPI.init(this, connection.getSASLResult());
+            }
+            catch (Exception e)
+            {
+               throw new HornetQAMQPInternalErrorException(e.getMessage(), e);
+            }
+         }
+      }
+   }
+
+
+   /**
+    * TODO: maybe it needs to go?
+    *
+    * @param consumer
+    * @param queueName
+    */
+   public void disconnect(Object consumer, String queueName)
+   {
+      AbstractProtonContextSender protonConsumer = senders.remove(consumer);
+      if (protonConsumer != null)
+      {
+         try
+         {
+            protonConsumer.close();
+         }
+         catch (HornetQAMQPException e)
+         {
+            protonConsumer.getSender().setTarget(null);
+            protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         }
+      }
+   }
+
+
+   @Override
+   public byte[] getTag()
+   {
+      return Long.toHexString(currentTag++).getBytes();
+   }
+
+   @Override
+   public void replaceTag(byte[] tag)
+   {
+      // TODO: do we need to reuse this?
+   }
+
+   @Override
+   public void close()
+   {
+      if (closed)
+      {
+         return;
+      }
+
+
+
+      // Making a copy to avoid ConcurrentModificationException during the iteration
+      Set<AbstractProtonReceiverContext> receiversCopy = new HashSet<>();
+      receiversCopy.addAll(receivers.values());
+
+
+      for (AbstractProtonReceiverContext protonProducer : receiversCopy)
+      {
+         try
+         {
+            protonProducer.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            // TODO Logging
+         }
+      }
+      receivers.clear();
+
+      Set<AbstractProtonContextSender> protonSendersClone = new HashSet<>();
+      protonSendersClone.addAll(senders.values());
+
+      for (AbstractProtonContextSender protonConsumer : protonSendersClone)
+      {
+         try
+         {
+            protonConsumer.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            // TODO Logging
+         }
+      }
+      senders.clear();
+      try
+      {
+         if (sessionSPI != null)
+         {
+            sessionSPI.rollbackCurrentTX();
+            sessionSPI.close();
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         // TODO logging
+      }
+      closed = true;
+   }
+
+   @Override
+   public void removeSender(Sender sender) throws HornetQAMQPException
+   {
+      senders.remove(sender);
+   }
+
+   @Override
+   public void removeReceiver(Receiver receiver)
+   {
+      receivers.remove(receiver);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
new file mode 100644
index 0000000..3436c5c
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import org.apache.qpid.proton.engine.Delivery;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         <p/>
+ *         An interface to handle deliveries, either messages, acks or transaction calls
+ */
+public interface ProtonDeliveryHandler
+{
+   void onFlow(int currentCredits);
+
+   void onMessage(Delivery delivery) throws HornetQAMQPException;
+
+   void close() throws HornetQAMQPException;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
new file mode 100644
index 0000000..0dad6d7
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import java.util.concurrent.TimeUnit;
+
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.exceptions.HornetQAMQPIllegalStateException;
+import org.proton.plug.exceptions.HornetQAMQPTimeoutException;
+import org.proton.plug.util.FutureRunnable;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonInitializable
+{
+
+   private Runnable afterInit;
+
+   private boolean initialized = false;
+
+   public void afterInit(Runnable afterInit)
+   {
+      this.afterInit = afterInit;
+   }
+
+
+   public boolean isInitialized()
+   {
+      return initialized;
+   }
+
+
+   public void initialise() throws Exception
+   {
+      if (!initialized)
+      {
+         initialized = false;
+         try
+         {
+            if (afterInit != null)
+            {
+               afterInit.run();
+            }
+         }
+         finally
+         {
+            afterInit = null;
+         }
+      }
+   }
+
+
+   public void waitWithTimeout(FutureRunnable latch) throws HornetQAMQPException
+   {
+      try
+      {
+         // TODO Configure this
+         if (!latch.await(30, TimeUnit.SECONDS))
+         {
+            throw new HornetQAMQPTimeoutException("Timed out waiting for response");
+         }
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         throw new HornetQAMQPIllegalStateException(e.getMessage());
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
new file mode 100644
index 0000000..eff114a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context;
+
+import org.apache.qpid.proton.engine.Sender;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface ProtonPlugSender
+{
+   int deliverMessage(Object message, int deliveryCount) throws Exception;
+
+   Sender getSender();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
new file mode 100644
index 0000000..9f53085
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.context;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle;
+
+import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl;
+import static org.proton.plug.util.DeliveryUtil.readDelivery;
+
+/**
+ * handles an amqp Coordinator to deal with transaction boundaries etc
+ */
+public class ProtonTransactionHandler implements ProtonDeliveryHandler
+{
+
+   final AMQPSessionCallback sessionSPI;
+
+   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI)
+   {
+      this.sessionSPI = sessionSPI;
+   }
+
+   @Override
+   public void onMessage(Delivery delivery) throws HornetQAMQPException
+   {
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      final Receiver receiver;
+      try
+      {
+         receiver = ((Receiver) delivery.getLink());
+
+         if (!delivery.isReadable())
+         {
+            return;
+         }
+
+         readDelivery(receiver, buffer);
+
+         receiver.advance();
+
+         MessageImpl msg = decodeMessageImpl(buffer);
+
+         Object action = ((AmqpValue) msg.getBody()).getValue();
+
+         if (action instanceof Declare)
+         {
+            Binary txID = sessionSPI.getCurrentTXID();
+            Declared declared = new Declared();
+            declared.setTxnId(txID);
+            delivery.disposition(declared);
+            delivery.settle();
+         }
+         else if (action instanceof Discharge)
+         {
+            Discharge discharge = (Discharge) action;
+            if (discharge.getFail())
+            {
+               try
+               {
+                  sessionSPI.rollbackCurrentTX();
+               }
+               catch (Exception e)
+               {
+                  throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
+               }
+            }
+            else
+            {
+               try
+               {
+                  sessionSPI.commitCurrentTX();
+               }
+               catch (Exception e)
+               {
+                  throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
+               }
+            }
+            delivery.settle();
+         }
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         Rejected rejected = new Rejected();
+         ErrorCondition condition = new ErrorCondition();
+         condition.setCondition(Symbol.valueOf("failed"));
+         condition.setDescription(e.getMessage());
+         rejected.setError(condition);
+         delivery.disposition(rejected);
+      }
+      finally
+      {
+         buffer.release();
+      }
+   }
+
+   public void onFlow(int credits)
+   {
+
+   }
+
+   @Override
+   public void close() throws HornetQAMQPException
+   {
+      //noop
+   }
+}


Mime
View raw message