activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [30/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:00 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
new file mode 100644
index 0000000..95a7f61
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.client;
+
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.ClientSASL;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.context.ProtonInitializable;
+import org.proton.plug.util.FutureRunnable;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext
+{
+   public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback)
+   {
+      super(connectionCallback);
+   }
+
+   // Maybe a client interface?
+   public void clientOpen(ClientSASL sasl) throws Exception
+   {
+      FutureRunnable future = new FutureRunnable(1);
+      synchronized (handler.getLock())
+      {
+         this.afterInit(future);
+         if (sasl != null)
+         {
+            handler.createClientSasl(sasl);
+         }
+         handler.getConnection().open();
+      }
+
+      flush();
+
+      waitWithTimeout(future);
+   }
+
+   public AMQPClientSessionContext createClientSession() throws HornetQAMQPException
+   {
+
+      FutureRunnable futureRunnable = new FutureRunnable(1);
+      ProtonClientSessionContext sessionImpl;
+      synchronized (handler.getLock())
+      {
+         Session session = handler.getConnection().session();
+         sessionImpl = (ProtonClientSessionContext) getSessionExtension(session);
+         sessionImpl.afterInit(futureRunnable);
+         session.open();
+      }
+
+      flush();
+      waitWithTimeout(futureRunnable);
+
+      return sessionImpl;
+   }
+
+   @Override
+   protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException
+   {
+      AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
+      AbstractProtonSessionContext protonSession = new ProtonClientSessionContext(sessionSPI, this, realSession);
+
+      return protonSession;
+
+   }
+
+   @Override
+   protected void remoteLinkOpened(Link link) throws Exception
+   {
+      Object context = link.getContext();
+      if (context != null && context instanceof ProtonInitializable)
+      {
+         ((ProtonInitializable) context).initialise();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
new file mode 100644
index 0000000..11f8709
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.client;
+
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPConnectionContextFactory;
+import org.proton.plug.AMQPConnectionCallback;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory
+{
+   private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory();
+
+   public static AMQPConnectionContextFactory getFactory()
+   {
+      return theInstance;
+   }
+
+   public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
+   {
+      return new ProtonClientConnectionContext(connectionCallback);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
new file mode 100644
index 0000000..f0dc46a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.client;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.AMQPClientSenderContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonContextSender;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.util.FutureRunnable;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonClientContext extends AbstractProtonContextSender implements AMQPClientSenderContext
+{
+
+   FutureRunnable catchUpRunnable = new FutureRunnable();
+
+   public ProtonClientContext(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server)
+   {
+      super(connection, sender, protonSession, server);
+   }
+
+
+   @Override
+   public void onMessage(Delivery delivery) throws HornetQAMQPException
+   {
+      if (delivery.getRemoteState() instanceof Accepted)
+      {
+         if (delivery.getContext() instanceof FutureRunnable)
+         {
+            ((FutureRunnable) delivery.getContext()).countDown();
+         }
+      }
+   }
+
+   public void send(ProtonJMessage message)
+   {
+      if (sender.getSenderSettleMode() != SenderSettleMode.SETTLED)
+      {
+         catchUpRunnable.countUp();
+      }
+      performSend(message, catchUpRunnable);
+   }
+
+
+   public boolean sync(long timeout, TimeUnit unit)
+   {
+      try
+      {
+         return catchUpRunnable.await(timeout, unit);
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+         return false;
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
new file mode 100644
index 0000000..ebf7f7d
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.client;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonReceiverContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+import static org.proton.plug.util.DeliveryUtil.readDelivery;
+import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl;
+
+/**
+ * @author Clebert Suconic
+ */
+public class ProtonClientReceiverContext extends AbstractProtonReceiverContext implements AMQPClientReceiverContext
+{
+   public ProtonClientReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver)
+   {
+      super(sessionSPI, connection, protonSession, receiver);
+   }
+
+   public void onFlow(int credits)
+   {
+   }
+
+   LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>();
+
+   /*
+   * called when Proton receives a message to be delivered via a Delivery.
+   *
+   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
+   *
+   * */
+   public void onMessage(Delivery delivery) throws HornetQAMQPException
+   {
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+      try
+      {
+         synchronized (connection.getLock())
+         {
+            readDelivery(receiver, buffer);
+            MessageImpl clientMessage = decodeMessageImpl(buffer);
+
+            // This second method could be better
+//            clientMessage.decode(buffer.nioBuffer());
+
+            receiver.advance();
+            delivery.disposition(Accepted.getInstance());
+            queues.add(clientMessage);
+
+         }
+      }
+      finally
+      {
+         buffer.release();
+      }
+   }
+
+
+   @Override
+   public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception
+   {
+      return queues.poll(time, unit);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
new file mode 100644
index 0000000..e888ea9
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.client;
+
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.proton.plug.AMQPClientReceiverContext;
+import org.proton.plug.AMQPClientSenderContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.util.FutureRunnable;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonClientSessionContext extends AbstractProtonSessionContext implements AMQPClientSessionContext
+{
+   public ProtonClientSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session)
+   {
+      super(sessionSPI, connection, session);
+   }
+
+   public AMQPClientSenderContext createSender(String address, boolean preSettled) throws HornetQAMQPException
+   {
+      FutureRunnable futureRunnable = new FutureRunnable(1);
+
+      ProtonClientContext amqpSender;
+      synchronized (connection.getLock())
+      {
+         Sender sender = session.sender(address);
+         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
+         Target target = new Target();
+         target.setAddress(address);
+         sender.setTarget(target);
+         amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
+         amqpSender.afterInit(futureRunnable);
+         sender.setContext(amqpSender);
+         sender.open();
+      }
+
+      connection.flush();
+
+      waitWithTimeout(futureRunnable);
+      return amqpSender;
+   }
+
+   public AMQPClientReceiverContext createReceiver(String address) throws HornetQAMQPException
+   {
+      FutureRunnable futureRunnable = new FutureRunnable(1);
+
+      ProtonClientReceiverContext amqpReceiver;
+
+      synchronized (connection.getLock())
+      {
+         Receiver receiver = session.receiver(address);
+         Source source = new Source();
+         source.setAddress(address);
+         receiver.setSource(source);
+         amqpReceiver = new ProtonClientReceiverContext(sessionSPI, connection, this, receiver);
+         receiver.setContext(amqpReceiver);
+         amqpReceiver.afterInit(futureRunnable);
+         receiver.open();
+      }
+
+      connection.flush();
+
+      waitWithTimeout(futureRunnable);
+
+      return amqpReceiver;
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
new file mode 100644
index 0000000..1d83e54
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.server;
+
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPServerConnectionContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext
+{
+   public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP)
+   {
+      super(connectionSP);
+   }
+
+   protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException
+   {
+      AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
+      AbstractProtonSessionContext protonSession = new ProtonServerSessionContext(sessionSPI, this, realSession);
+
+      return protonSession;
+   }
+
+   protected void remoteLinkOpened(Link link) throws Exception
+   {
+
+      ProtonServerSessionContext protonSession = (ProtonServerSessionContext) getSessionExtension(link.getSession());
+
+      link.setSource(link.getRemoteSource());
+      link.setTarget(link.getRemoteTarget());
+      if (link instanceof Receiver)
+      {
+         Receiver receiver = (Receiver) link;
+         if (link.getRemoteTarget() instanceof Coordinator)
+         {
+            Coordinator coordinator = (Coordinator) link.getRemoteTarget();
+            protonSession.addTransactionHandler(coordinator, receiver);
+         }
+         else
+         {
+            protonSession.addReceiver(receiver);
+            receiver.flow(100);
+         }
+      }
+      else
+      {
+         Sender sender = (Sender) link;
+         protonSession.addSender(sender);
+         sender.offer(1);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
new file mode 100644
index 0000000..22e3675
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.server;
+
+import org.proton.plug.AMQPConnectionContextFactory;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPServerConnectionContext;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory
+{
+   private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory();
+
+   public static ProtonServerConnectionContextFactory getFactory()
+   {
+      return theInstance;
+   }
+
+   public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
+   {
+      ProtonServerConnectionContext connection = new ProtonServerConnectionContext(connectionCallback);
+      return connection;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
new file mode 100644
index 0000000..21ca2bc
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Receiver;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonReceiverContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.exceptions.HornetQAMQPInternalErrorException;
+import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle;
+
+import static org.proton.plug.util.DeliveryUtil.readDelivery;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonServerReceiverContext extends AbstractProtonReceiverContext
+{
+
+   private final int numberOfCredits = 100;
+
+   public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver)
+   {
+      super(sessionSPI, connection, protonSession, receiver);
+   }
+
+   public void onFlow(int credits)
+   {
+   }
+
+
+   @Override
+   public void initialise() throws Exception
+   {
+      super.initialise();
+      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
+
+      if (target != null)
+      {
+         if (target.getDynamic())
+         {
+            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+            // will be deleted on closing of the session
+            String queue = sessionSPI.tempQueueName();
+
+
+            try
+            {
+               sessionSPI.createTemporaryQueue(queue);
+            }
+            catch (Exception e)
+            {
+               throw new HornetQAMQPInternalErrorException(e.getMessage(), e);
+            }
+            target.setAddress(queue.toString());
+         }
+         else
+         {
+            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+            //be a queue bound to it so we nee to check this.
+            String address = target.getAddress();
+            if (address == null)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
+            }
+            try
+            {
+               if (!sessionSPI.queueQuery(address))
+               {
+                  throw HornetQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+               }
+            }
+            catch (Exception e)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage());
+            }
+         }
+      }
+
+      flow(numberOfCredits);
+   }
+
+   /*
+   * called when Proton receives a message to be delivered via a Delivery.
+   *
+   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
+   *
+   * */
+   public void onMessage(Delivery delivery) throws HornetQAMQPException
+   {
+      Receiver receiver;
+      try
+      {
+         receiver = ((Receiver) delivery.getLink());
+
+         if (!delivery.isReadable())
+         {
+            System.err.println("!!!!! Readable!!!!!!!");
+            return;
+         }
+
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
+         try
+         {
+            synchronized (connection.getLock())
+            {
+               readDelivery(receiver, buffer);
+
+               receiver.advance();
+
+               sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer);
+               delivery.disposition(Accepted.getInstance());
+               delivery.settle();
+
+               if (receiver.getRemoteCredit() < numberOfCredits / 2)
+               {
+                  flow(numberOfCredits);
+               }
+            }
+         }
+         finally
+         {
+            buffer.release();
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         Rejected rejected = new Rejected();
+         ErrorCondition condition = new ErrorCondition();
+         condition.setCondition(Symbol.valueOf("failed"));
+         condition.setDescription(e.getMessage());
+         rejected.setError(condition);
+         delivery.disposition(rejected);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
new file mode 100644
index 0000000..9b66de0
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.server;
+
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonContextSender;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.exceptions.HornetQAMQPException;
+import org.proton.plug.exceptions.HornetQAMQPInternalErrorException;
+import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle;
+import org.proton.plug.context.ProtonPlugSender;
+import org.apache.qpid.proton.amqp.messaging.Source;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender
+{
+
+   private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
+   private static final Symbol COPY = Symbol.valueOf("copy");
+
+   private Object brokerConsumer;
+
+   public ProtonServerSenderContext(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server)
+   {
+      super(connection, sender, protonSession, server);
+   }
+
+   public Object getBrokerConsumer()
+   {
+      return brokerConsumer;
+   }
+
+   public void onFlow(int currentCredits)
+   {
+      super.onFlow(currentCredits);
+      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits);
+   }
+
+   /*
+* start the session
+* */
+   public void start() throws HornetQAMQPException
+   {
+      super.start();
+      // protonSession.getServerSession().start();
+
+      //todo add flow control
+      try
+      {
+         // to do whatever you need to make the broker start sending messages to the consumer
+         sessionSPI.startSender(brokerConsumer);
+         //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
+      }
+      catch (Exception e)
+      {
+         throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
+      }
+   }
+
+   /**
+    * create the actual underlying HornetQ Server Consumer
+    */
+   @Override
+   public void initialise() throws Exception
+   {
+      super.initialise();
+
+      Source source = (Source) sender.getRemoteSource();
+
+      String queue;
+
+      String selector = null;
+      Map filter = source == null ? null : source.getFilter();
+      if (filter != null)
+      {
+         DescribedType value = (DescribedType) filter.get(SELECTOR);
+         if (value != null)
+         {
+            selector = value.getDescribed().toString();
+         }
+      }
+
+      if (source != null)
+      {
+         if (source.getDynamic())
+         {
+            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
+            // will be deleted on closing of the session
+            queue = java.util.UUID.randomUUID().toString();
+            try
+            {
+               sessionSPI.createTemporaryQueue(queue);
+               //protonSession.getServerSession().createQueue(queue, queue, null, true, false);
+            }
+            catch (Exception e)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
+            }
+            source.setAddress(queue);
+         }
+         else
+         {
+            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
+            //be a queue bound to it so we nee to check this.
+            queue = source.getAddress();
+            if (queue == null)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
+            }
+
+            try
+            {
+               if (!sessionSPI.queueQuery(queue))
+               {
+                  throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
+               }
+            }
+            catch (Exception e)
+            {
+               throw new HornetQAMQPInternalErrorException(e.getMessage(), e);
+            }
+         }
+
+         boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
+         try
+         {
+            brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly);
+         }
+         catch (Exception e)
+         {
+            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQConsumer(e.getMessage());
+         }
+      }
+   }
+
+   /*
+   * close the session
+   * */
+   public void close() throws HornetQAMQPException
+   {
+      super.close();
+      try
+      {
+         sessionSPI.closeSender(brokerConsumer);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw new HornetQAMQPInternalErrorException(e.getMessage());
+      }
+   }
+
+
+   public void onMessage(Delivery delivery) throws HornetQAMQPException
+   {
+      Object message = delivery.getContext();
+
+      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
+
+      DeliveryState remoteState = delivery.getRemoteState();
+
+      if (remoteState != null)
+      {
+         if (remoteState instanceof Accepted)
+         {
+            //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
+            // from dealer, a perf hit but a must
+            try
+            {
+               sessionSPI.ack(brokerConsumer, message);
+            }
+            catch (Exception e)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+            }
+         }
+         else if (remoteState instanceof Released)
+         {
+            try
+            {
+               sessionSPI.cancel(brokerConsumer, message, false);
+            }
+            catch (Exception e)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+            }
+         }
+         else if (remoteState instanceof Rejected || remoteState instanceof Modified)
+         {
+            try
+            {
+               sessionSPI.cancel(brokerConsumer, message, true);
+            }
+            catch (Exception e)
+            {
+               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+            }
+         }
+         //todo add tag caching
+         if (!preSettle)
+         {
+            protonSession.replaceTag(delivery.getTag());
+         }
+
+         synchronized (connection.getLock())
+         {
+            delivery.settle();
+            sender.offer(1);
+         }
+
+      }
+      else
+      {
+         //todo not sure if we need to do anything here
+      }
+   }
+
+   @Override
+   public synchronized void checkState()
+   {
+      super.checkState();
+      sessionSPI.resumeDelivery(brokerConsumer);
+   }
+
+
+   /**
+    * handle an out going message from HornetQ, send via the Proton Sender
+    */
+   public int deliverMessage(Object message, int deliveryCount) throws Exception
+   {
+      if (closed)
+      {
+         System.err.println("Message can't be delivered as it's closed");
+         return 0;
+      }
+
+      //encode the message
+      ProtonJMessage serverMessage;
+      try
+      {
+         // This can be done a lot better here
+         serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace();
+         throw new HornetQAMQPInternalErrorException(e.getMessage(), e);
+      }
+
+      return performSend(serverMessage, message);
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
new file mode 100644
index 0000000..1860c90
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.context.server;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.context.AbstractConnectionContext;
+import org.proton.plug.context.AbstractProtonContextSender;
+import org.proton.plug.context.AbstractProtonReceiverContext;
+import org.proton.plug.context.AbstractProtonSessionContext;
+import org.proton.plug.context.ProtonTransactionHandler;
+import org.proton.plug.exceptions.HornetQAMQPException;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonServerSessionContext extends AbstractProtonSessionContext
+{
+
+   public ProtonServerSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session)
+   {
+      super(sessionSPI, connection, session);
+   }
+
+   protected Map<Object, AbstractProtonContextSender> serverSenders = new HashMap<Object, AbstractProtonContextSender>();
+
+
+   /**
+    * The consumer object from the broker or the key used to store the sender
+    *
+    * @param message
+    * @param consumer
+    * @param deliveryCount
+    * @return the number of bytes sent
+    */
+   public int serverDelivery(Object message, Object consumer, int deliveryCount) throws Exception
+   {
+      ProtonServerSenderContext protonSender = (ProtonServerSenderContext) serverSenders.get(consumer);
+      if (protonSender != null)
+      {
+         return protonSender.deliverMessage(message, deliveryCount);
+      }
+      return 0;
+   }
+
+   public void addTransactionHandler(Coordinator coordinator, Receiver receiver)
+   {
+      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+      receiver.setContext(transactionHandler);
+      receiver.open();
+      receiver.flow(100);
+   }
+
+   public void addSender(Sender sender) throws Exception
+   {
+      ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI);
+
+      try
+      {
+         protonSender.initialise();
+         senders.put(sender, protonSender);
+         serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
+         sender.setContext(protonSender);
+         sender.open();
+         protonSender.start();
+      }
+      catch (HornetQAMQPException e)
+      {
+         senders.remove(sender);
+         sender.setSource(null);
+         sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         sender.close();
+      }
+   }
+
+   public void removeSender(Sender sender) throws HornetQAMQPException
+   {
+      ProtonServerSenderContext senderRemoved = (ProtonServerSenderContext) senders.remove(sender);
+      if (senderRemoved != null)
+      {
+         serverSenders.remove(senderRemoved.getBrokerConsumer());
+      }
+   }
+
+
+   public void addReceiver(Receiver receiver) throws Exception
+   {
+      try
+      {
+         AbstractProtonReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
+         protonReceiver.initialise();
+         receivers.put(receiver, protonReceiver);
+         receiver.setContext(protonReceiver);
+         receiver.open();
+      }
+      catch (HornetQAMQPException e)
+      {
+         receivers.remove(receiver);
+         receiver.setTarget(null);
+         receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
+         receiver.close();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java
new file mode 100644
index 0000000..0a3c68a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.Symbol;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         6/6/13
+ */
+public class HornetQAMQPException extends Exception
+{
+
+   private static final String ERROR_PREFIX = "amqp:";
+
+   public Symbol getAmqpError()
+   {
+      return amqpError;
+   }
+
+   private final Symbol amqpError;
+
+   public HornetQAMQPException(Symbol amqpError, String message, Throwable e)
+   {
+      super(message, e);
+      this.amqpError = amqpError;
+   }
+
+   public HornetQAMQPException(Symbol amqpError, String message)
+   {
+      super(message);
+      this.amqpError = amqpError;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java
new file mode 100644
index 0000000..f3e3b38
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         6/6/13
+ */
+public class HornetQAMQPIllegalStateException extends HornetQAMQPException
+{
+   public HornetQAMQPIllegalStateException(String message)
+   {
+      super(AmqpError.ILLEGAL_STATE, message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java
new file mode 100644
index 0000000..7db34a7
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         6/6/13
+ */
+public class HornetQAMQPInternalErrorException extends HornetQAMQPException
+{
+   public HornetQAMQPInternalErrorException(String message, Throwable e)
+   {
+      super(AmqpError.INTERNAL_ERROR, message, e);
+   }
+
+   public HornetQAMQPInternalErrorException(String message)
+   {
+      super(AmqpError.INTERNAL_ERROR, message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java
new file mode 100644
index 0000000..2f73de8
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         6/6/13
+ */
+public class HornetQAMQPInvalidFieldException extends HornetQAMQPException
+{
+   public HornetQAMQPInvalidFieldException(String message)
+   {
+      super(AmqpError.INVALID_FIELD, message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java
new file mode 100644
index 0000000..35a2143
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         6/19/13
+ */
+public class HornetQAMQPNotImplementedException extends HornetQAMQPException
+{
+   public HornetQAMQPNotImplementedException(String message)
+   {
+      super(AmqpError.NOT_IMPLEMENTED, message);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java
new file mode 100644
index 0000000..50d72d8
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.exceptions;
+
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQAMQPTimeoutException extends HornetQAMQPException
+{
+   public HornetQAMQPTimeoutException(String message)
+   {
+      super(AmqpError.ILLEGAL_STATE, message);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
new file mode 100644
index 0000000..482821a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.handler;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+
+/**
+ * EventHandler
+ * <p/>
+ *
+ * @author rhs
+ */
+
+public interface EventHandler
+{
+
+   void onSASLInit(ProtonHandler handler, Connection connection);
+
+   void onInit(Connection connection) throws Exception;
+
+   void onLocalOpen(Connection connection) throws Exception;
+
+   void onRemoteOpen(Connection connection) throws Exception;
+
+   void onLocalClose(Connection connection) throws Exception;
+
+   void onRemoteClose(Connection connection) throws Exception;
+
+   void onFinal(Connection connection) throws Exception;
+
+   void onInit(Session session) throws Exception;
+
+   void onLocalOpen(Session session) throws Exception;
+
+   void onRemoteOpen(Session session) throws Exception;
+
+   void onLocalClose(Session session) throws Exception;
+
+   void onRemoteClose(Session session) throws Exception;
+
+   void onFinal(Session session) throws Exception;
+
+   void onInit(Link link) throws Exception;
+
+   void onLocalOpen(Link link) throws Exception;
+
+   void onRemoteOpen(Link link) throws Exception;
+
+   void onLocalClose(Link link) throws Exception;
+
+   void onRemoteClose(Link link) throws Exception;
+
+   void onFlow(Link link) throws Exception;
+
+   void onFinal(Link link) throws Exception;
+
+   void onRemoteDetach(Link link) throws Exception;
+
+   void onDetach(Link link) throws Exception;
+
+   void onDelivery(Delivery delivery) throws Exception;
+
+   void onTransport(Transport transport) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java
new file mode 100644
index 0000000..4d58aa1
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.handler;
+
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Transport;
+
+/**
+ * TODO : this needs a better home
+ *
+ * @author rhs
+ */
+
+public final class Events
+{
+
+   public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception
+   {
+      handler.onTransport(transport);
+   }
+
+   public static void dispatch(Event event, EventHandler handler) throws Exception
+   {
+      switch (event.getType())
+      {
+         case CONNECTION_INIT:
+            handler.onInit(event.getConnection());
+            break;
+         case CONNECTION_LOCAL_OPEN:
+            handler.onLocalOpen(event.getConnection());
+            break;
+         case CONNECTION_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getConnection());
+            break;
+         case CONNECTION_LOCAL_CLOSE:
+            handler.onLocalClose(event.getConnection());
+            break;
+         case CONNECTION_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getConnection());
+            break;
+         case CONNECTION_FINAL:
+            handler.onFinal(event.getConnection());
+            break;
+         case SESSION_INIT:
+            handler.onInit(event.getSession());
+            break;
+         case SESSION_LOCAL_OPEN:
+            handler.onLocalOpen(event.getSession());
+            break;
+         case SESSION_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getSession());
+            break;
+         case SESSION_LOCAL_CLOSE:
+            handler.onLocalClose(event.getSession());
+            break;
+         case SESSION_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getSession());
+            break;
+         case SESSION_FINAL:
+            handler.onFinal(event.getSession());
+            break;
+         case LINK_INIT:
+            handler.onInit(event.getLink());
+            break;
+         case LINK_LOCAL_OPEN:
+            handler.onLocalOpen(event.getLink());
+            break;
+         case LINK_REMOTE_OPEN:
+            handler.onRemoteOpen(event.getLink());
+            break;
+         case LINK_LOCAL_CLOSE:
+            handler.onLocalClose(event.getLink());
+            break;
+         case LINK_REMOTE_CLOSE:
+            handler.onRemoteClose(event.getLink());
+            break;
+         case LINK_FLOW:
+            handler.onFlow(event.getLink());
+            break;
+         case LINK_FINAL:
+            handler.onFinal(event.getLink());
+            break;
+         case LINK_LOCAL_DETACH:
+            handler.onDetach(event.getLink());
+            break;
+         case LINK_REMOTE_DETACH:
+            handler.onRemoteDetach(event.getLink());
+            break;
+         case TRANSPORT:
+            handler.onTransport(event.getTransport());
+            break;
+         case DELIVERY:
+            handler.onDelivery(event.getDelivery());
+            break;
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
new file mode 100644
index 0000000..99f4658
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.handler;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Transport;
+import org.proton.plug.ClientSASL;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.SASLResult;
+import org.proton.plug.handler.impl.ProtonHandlerImpl;
+
+/**
+ * This is a definition of the public interface for {@link org.proton.plug.handler.impl.ProtonHandlerImpl}
+ *
+ * @author Clebert Suconic
+ */
+
+public interface ProtonHandler
+{
+
+   public static final class Factory
+   {
+      public static ProtonHandler create()
+      {
+         return new ProtonHandlerImpl();
+      }
+   }
+
+
+   /**
+    * It returns true if the transport connection has any capacity available
+    *
+    * @return
+    */
+   int capacity();
+
+   Transport getTransport();
+
+   Connection getConnection();
+
+   /**
+    * Add an event handler to the chain
+    *
+    * @param handler
+    * @return
+    */
+   ProtonHandler addEventHandler(EventHandler handler);
+
+   void createClientSasl(ClientSASL clientSASL);
+
+   /**
+    * To be used on server connections. To define SASL integration.
+    *
+    * @param handlers
+    */
+   void createServerSASL(ServerSASL[] handlers);
+
+   /**
+    * To return the SASL Mechanism that was successful with the connection.
+    * This should contain any state such as user and password
+    *
+    * @return
+    */
+   SASLResult getSASLResult();
+
+   /**
+    * The input on the Handler.
+    * Notice that buffer will be positioned up to where we needed
+    *
+    * @param buffer
+    */
+   void inputBuffer(ByteBuf buffer);
+
+   /**
+    * To be used at your discretion to verify if the client was active since you last checked
+    * it can be used to implement server TTL cleanup and verifications
+    *
+    * @return
+    */
+   boolean checkDataReceived();
+
+   /**
+    * Return the creation time of the handler
+    *
+    * @return
+    */
+   long getCreationTime();
+
+   /**
+    * To be called after you used the outputBuffer
+    *
+    * @param bytes number of bytes you used already on the output
+    */
+   void outputDone(int bytes);
+
+   /**
+    * it will return pending bytes you have on the Transport
+    * after you are done with it you must call {@link #outputDone(int)}
+    *
+    * @return
+    */
+   ByteBuf outputBuffer();
+
+   /**
+    * It will process the transport and cause events to be called
+    */
+   void flush();
+
+   /**
+    * It will close the connection and flush events
+    */
+   void close();
+
+
+   /**
+    * Get the object used to lock transport, connection and events operations
+    *
+    * @return
+    */
+   Object getLock();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
new file mode 100644
index 0000000..2e1be00
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.handler.impl;
+
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.Transport;
+import org.proton.plug.handler.EventHandler;
+
+/**
+ * This is useful for cases where you only want to implement a few methods
+ *
+ * @author Clebert Suconic
+ */
+public abstract class DefaultEventHandler implements EventHandler
+{
+   @Override
+   public void onInit(Connection connection) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalOpen(Connection connection) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteOpen(Connection connection) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalClose(Connection connection) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteClose(Connection connection) throws Exception
+   {
+   }
+
+   @Override
+   public void onFinal(Connection connection) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onInit(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalOpen(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteOpen(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalClose(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteClose(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onFinal(Session session) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onInit(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalOpen(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteOpen(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onLocalClose(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onRemoteClose(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onFlow(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onFinal(Link link) throws Exception
+   {
+
+   }
+
+
+   @Override
+   public void onRemoteDetach(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onDetach(Link link) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onDelivery(Delivery delivery) throws Exception
+   {
+
+   }
+
+   @Override
+   public void onTransport(Transport transport) throws Exception
+   {
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
new file mode 100644
index 0000000..3eb7093
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
@@ -0,0 +1,423 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.handler.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.proton.plug.ClientSASL;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.handler.EventHandler;
+import org.proton.plug.handler.Events;
+import org.proton.plug.handler.ProtonHandler;
+import org.proton.plug.context.ProtonInitializable;
+import org.proton.plug.SASLResult;
+import org.proton.plug.util.ByteUtil;
+import org.proton.plug.util.DebugInfo;
+
+/**
+ * Clebert Suconic
+ */
+public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler
+{
+   private final Transport transport = Proton.transport();
+
+   private final Connection connection = Proton.connection();
+
+   private final Collector collector = Proton.collector();
+
+   private ArrayList<EventHandler> handlers = new ArrayList<>();
+
+   private Sasl serverSasl;
+
+   private Sasl clientSasl;
+
+   private final Object lock = new Object();
+
+   private final long creationTime;
+
+   private Map<String, ServerSASL> saslHandlers;
+
+   private SASLResult saslResult;
+
+   /**
+    * If dispatching a dispatch call is ignored to avoid infinite stack loop
+    */
+   private boolean dispatching = false;
+
+   protected volatile boolean dataReceived;
+
+   protected boolean receivedFirstPacket = false;
+
+   private int offset = 0;
+
+   public ProtonHandlerImpl()
+   {
+      this.creationTime = System.currentTimeMillis();
+      transport.bind(connection);
+      connection.collect(collector);
+   }
+
+   @Override
+   public int capacity()
+   {
+      synchronized (lock)
+      {
+         return transport.capacity();
+      }
+   }
+
+   public Object getLock()
+   {
+      return lock;
+   }
+
+   @Override
+   public Transport getTransport()
+   {
+      return transport;
+   }
+
+   @Override
+   public Connection getConnection()
+   {
+      return connection;
+   }
+
+   @Override
+   public ProtonHandler addEventHandler(EventHandler handler)
+   {
+      handlers.add(handler);
+      return this;
+   }
+
+   @Override
+   public void createServerSASL(ServerSASL[] handlers)
+   {
+      this.serverSasl = transport.sasl();
+      saslHandlers = new HashMap<>();
+      String[] names = new String[handlers.length];
+      int count = 0;
+      for (ServerSASL handler : handlers)
+      {
+         saslHandlers.put(handler.getName(), handler);
+         names[count++] = handler.getName();
+      }
+      this.serverSasl.server();
+      serverSasl.setMechanisms(names);
+
+   }
+
+   @Override
+   public SASLResult getSASLResult()
+   {
+      return saslResult;
+   }
+
+   @Override
+   public void inputBuffer(ByteBuf buffer)
+   {
+      dataReceived = true;
+      synchronized (lock)
+      {
+         while (buffer.readableBytes() > 0)
+         {
+            int capacity = transport.capacity();
+
+            if (!receivedFirstPacket)
+            {
+               try
+               {
+                  if (buffer.getByte(4) == 0x03)
+                  {
+                     dispatchSASL();
+                  }
+               }
+               catch (Throwable ignored)
+               {
+                  ignored.printStackTrace();
+               }
+
+               receivedFirstPacket = true;
+            }
+
+
+            if (capacity > 0)
+            {
+               ByteBuffer tail = transport.tail();
+               int min = Math.min(capacity, buffer.readableBytes());
+               tail.limit(min);
+               buffer.readBytes(tail);
+
+
+               flush();
+            }
+            else
+            {
+               if (capacity == 0)
+               {
+                  System.out.println("abandoning: " + buffer.readableBytes());
+               }
+               else
+               {
+                  System.out.println("transport closed, discarding: " + buffer.readableBytes() + " capacity = " + transport.capacity());
+               }
+               break;
+            }
+         }
+      }
+   }
+
+
+   @Override
+   public boolean checkDataReceived()
+   {
+      boolean res = dataReceived;
+
+      dataReceived = false;
+
+      return res;
+   }
+
+   @Override
+   public long getCreationTime()
+   {
+      return creationTime;
+   }
+
+   @Override
+   public void outputDone(int bytes)
+   {
+      synchronized (lock)
+      {
+         transport.pop(bytes);
+         offset -= bytes;
+
+         if (offset < 0)
+         {
+            throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
+                                               ", outcome result=" + offset);
+         }
+      }
+
+      flush();
+   }
+
+   @Override
+   public ByteBuf outputBuffer()
+   {
+
+      synchronized (lock)
+      {
+         int pending = transport.pending();
+
+         if (pending < 0)
+         {
+            return null;//throw new IllegalStateException("xxx need to close the connection");
+         }
+
+         int size = pending - offset;
+
+         if (size < 0)
+         {
+            throw new IllegalStateException("negative size: " + pending);
+         }
+
+         if (size == 0)
+         {
+            return null;
+         }
+
+         // For returning PooledBytes
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+         ByteBuffer head = transport.head();
+         head.position(offset);
+         buffer.writeBytes(head);
+         offset += size; // incrementing offset for future calls
+         return buffer;
+      }
+   }
+
+   public void createClientSasl(ClientSASL clientSASL)
+   {
+      if (clientSASL != null)
+      {
+         clientSasl = transport.sasl();
+         clientSasl.setMechanisms(clientSASL.getName());
+         byte[] initialSasl = clientSASL.getBytes();
+         clientSasl.send(initialSasl, 0, initialSasl.length);
+      }
+   }
+
+
+   @Override
+   public void flush()
+   {
+      synchronized (lock)
+      {
+         transport.process();
+
+         checkServerSASL();
+
+         if (dispatching)
+         {
+            return;
+         }
+
+         dispatching = true;
+
+      }
+
+      try
+      {
+         dispatch();
+      }
+      finally
+      {
+         dispatching = false;
+      }
+   }
+
+   @Override
+   public void close()
+   {
+      synchronized (lock)
+      {
+         connection.close();
+      }
+      flush();
+   }
+
+   protected void checkServerSASL()
+   {
+      if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0)
+      {
+         // TODO: should we look at the first only?
+         ServerSASL mechanism = saslHandlers.get(serverSasl.getRemoteMechanisms()[0]);
+         if (mechanism != null)
+         {
+
+            byte[] dataSASL = new byte[serverSasl.pending()];
+            serverSasl.recv(dataSASL, 0, dataSASL.length);
+
+            if (DebugInfo.debug)
+            {
+               System.out.println("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
+            }
+
+            saslResult = mechanism.processSASL(dataSASL);
+
+            if (saslResult != null && saslResult.isSuccess())
+            {
+               serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK);
+               serverSasl = null;
+               saslHandlers.clear();
+               saslHandlers = null;
+            }
+            else
+            {
+               serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
+            }
+            serverSasl = null;
+         }
+         else
+         {
+            // no auth available, system error
+            serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS);
+         }
+      }
+   }
+
+   private Event popEvent()
+   {
+      synchronized (lock)
+      {
+         Event ev = collector.peek();
+         if (ev != null)
+         {
+            // pop will invalidate the event
+            // for that reason we make a new one
+            // Events are reused inside the collector, so we need to make a new one here
+            ev = ev.copy();
+            collector.pop();
+         }
+         return ev;
+      }
+   }
+
+   private void dispatchSASL()
+   {
+      for (EventHandler h: handlers)
+      {
+         h.onSASLInit(this, getConnection());
+      }
+   }
+
+
+   private void dispatch()
+   {
+      Event ev;
+      // We don't hold a lock on the entire event processing
+      // because we could have a distributed deadlock
+      // while processing events (for instance onTransport)
+      // while a client is also trying to write here
+      while ((ev = popEvent()) != null)
+      {
+         for (EventHandler h : handlers)
+         {
+            if (DebugInfo.debug)
+            {
+               System.out.println("Handling " + ev + " towards " + h);
+            }
+            try
+            {
+               Events.dispatch(ev, h);
+            }
+            catch (Exception e)
+            {
+               // TODO: logs
+               e.printStackTrace();
+               connection.setCondition(new ErrorCondition());
+            }
+         }
+      }
+
+      for (EventHandler h : handlers)
+      {
+         try
+         {
+            h.onTransport(transport);
+         }
+         catch (Exception e)
+         {
+            // TODO: logs
+            e.printStackTrace();
+            connection.setCondition(new ErrorCondition());
+         }
+      }
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java
new file mode 100644
index 0000000..c4c22c8
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.logger;
+
+import org.proton.plug.exceptions.HornetQAMQPIllegalStateException;
+import org.proton.plug.exceptions.HornetQAMQPInternalErrorException;
+import org.proton.plug.exceptions.HornetQAMQPInvalidFieldException;
+import org.jboss.logging.annotations.Message;
+import org.jboss.logging.annotations.MessageBundle;
+import org.jboss.logging.Messages;
+
+/**
+ * Logger Code 11
+ * <p/>
+ * Each message id must be 6 digits long starting with 10, the 3rd digit should be 9. So the range
+ * is from 219000 to 119999.
+ * <p/>
+ * Once released, methods should not be deleted as they may be referenced by knowledge base
+ * articles. Unused methods should be marked as deprecated.
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+@MessageBundle(projectCode = "HQ")
+public interface HornetQAMQPProtocolMessageBundle
+{
+   HornetQAMQPProtocolMessageBundle BUNDLE = Messages.getBundle(HornetQAMQPProtocolMessageBundle.class);
+
+
+   @Message(id = 219000, value = "target address not set", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInvalidFieldException targetAddressNotSet();
+
+   @Message(id = 219001, value = "error creating temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInternalErrorException errorCreatingTemporaryQueue(String message);
+
+   @Message(id = 219002, value = "target address does not exist", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException addressDoesntExist();
+
+   @Message(id = 219003, value = "error finding temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInternalErrorException errorFindingTemporaryQueue(String message);
+
+   @Message(id = 219004, value = "error creating HornetQ Session, {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInternalErrorException errorCreatingHornetQSession(String message);
+
+   @Message(id = 219005, value = "error creating HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInternalErrorException errorCreatingHornetQConsumer(String message);
+
+   @Message(id = 219006, value = "error starting HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorStartingConsumer(String message);
+
+   @Message(id = 219007, value = "error acknowledging message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorAcknowledgingMessage(String messageID, String message);
+
+   @Message(id = 219008, value = "error cancelling message {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorCancellingMessage(String messageID, String message);
+
+   @Message(id = 219009, value = "error closing consumer {0}, {1}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorClosingConsumer(long consumerID, String message);
+
+   @Message(id = 219010, value = "source address does not exist", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInvalidFieldException sourceAddressDoesntExist();
+
+   @Message(id = 219011, value = "source address not set", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPInvalidFieldException sourceAddressNotSet();
+
+   @Message(id = 219012, value = "error rolling back coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorRollingbackCoordinator(String message);
+
+   @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT)
+   HornetQAMQPIllegalStateException errorCommittingCoordinator(String message);
+
+   @Message(id = 219015, value = "error decoding AMQP frame", format = Message.Format.MESSAGE_FORMAT)
+   String decodeError();
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
new file mode 100644
index 0000000..be05f37
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.sasl;
+
+import org.proton.plug.SASLResult;
+import org.proton.plug.ServerSASL;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class AnonymousServerSASL implements ServerSASL
+{
+   public AnonymousServerSASL()
+   {
+   }
+
+   @Override
+   public String getName()
+   {
+      return "ANONYMOUS";
+   }
+
+   @Override
+   public SASLResult processSASL(byte[] bytes)
+   {
+      return new PlainSASLResult(true, null, null);
+   }
+}
+


Mime
View raw message