activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [37/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:07 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java
deleted file mode 100644
index 4bf52a7..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.client.impl.ClientConsumerImpl;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.QueueQueryResult;
-import org.hornetq.core.server.ServerMessage;
-
-/**
- * A this is a wrapper around a HornetQ ServerConsumer for handling outgoing messages and incoming acks via a Proton Sender
- *
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class ProtonConsumer implements ProtonDeliveryHandler
-{
-   private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
-   private static final Symbol COPY = Symbol.valueOf("copy");
-   private final ProtonSession protonSession;
-   private final HornetQServer server;
-   private final Sender sender;
-   private final ProtonRemotingConnection connection;
-   private final ProtonProtocolManager protonProtocolManager;
-   private long consumerID;
-   private boolean closed = false;
-   private long forcedDeliveryCount = 0;
-   private boolean forcingDelivery = false;
-   private boolean receivedForcedDelivery = true;
-
-   public ProtonConsumer(ProtonRemotingConnection connection, Sender sender, ProtonSession protonSession, HornetQServer server,
-                         ProtonProtocolManager protonProtocolManager)
-   {
-      this.connection = connection;
-      this.sender = sender;
-      this.protonSession = protonSession;
-      this.server = server;
-      this.protonProtocolManager = protonProtocolManager;
-   }
-
-   /*
-   * start the session
-   * */
-   public void start() throws HornetQAMQPException
-   {
-      protonSession.getServerSession().start();
-
-      //todo add flow control
-      try
-      {
-         protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
-      }
-      catch (Exception e)
-      {
-         throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
-      }
-   }
-
-   /*
-   * create the actual underlying HornetQ Server Consumer
-   * */
-   public void init() throws HornetQAMQPException
-   {
-      org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
-
-      SimpleString queue;
-
-      consumerID = server.getStorageManager().generateUniqueID();
-
-      SimpleString selector = null;
-      Map filter = source.getFilter();
-      if (filter != null)
-      {
-         DescribedType value = (DescribedType) filter.get(SELECTOR);
-         if (value != null)
-         {
-            selector = new SimpleString(value.getDescribed().toString());
-         }
-      }
-
-      if (source.getDynamic())
-      {
-         //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
-         // will be deleted on closing of the session
-         queue = new SimpleString(java.util.UUID.randomUUID().toString());
-         try
-         {
-            protonSession.getServerSession().createQueue(queue, queue, null, true, false);
-         }
-         catch (Exception e)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-         }
-         source.setAddress(queue.toString());
-      }
-      else
-      {
-         //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
-         //be a queue bound to it so we nee to check this.
-         String address = source.getAddress();
-         if (address == null)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
-         }
-
-         queue = new SimpleString(source.getAddress());
-         QueueQueryResult queryResult;
-         try
-         {
-            queryResult = protonSession.getServerSession().executeQueueQuery(new SimpleString(address));
-         }
-         catch (Exception e)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage());
-         }
-         if (!queryResult.isExists())
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
-         }
-      }
-      boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
-      try
-      {
-         protonSession.getServerSession().createConsumer(consumerID, queue, selector, browseOnly);
-      }
-      catch (Exception e)
-      {
-         throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQConsumer(e.getMessage());
-      }
-   }
-
-   /*
-   * close the session
-   * */
-   public synchronized void close() throws HornetQAMQPException
-   {
-      closed = true;
-      protonSession.removeConsumer(consumerID);
-   }
-
-   public long getConsumerID()
-   {
-      return consumerID;
-   }
-
-   /*
-   * handle an out going message from HornetQ, send via the Proton Sender
-   * */
-   public synchronized int handleDelivery(ServerMessage message, int deliveryCount)
-   {
-      if (closed)
-      {
-         return 0;
-      }
-      if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
-      {
-         if (forcingDelivery)
-         {
-            sender.drained();
-         }
-         else
-         {
-            receivedForcedDelivery = true;
-            forcingDelivery = false;
-         }
-         return 0;
-      }
-      //if we get here then a forced delivery has pushed some messages thru and we continue
-      if (forcingDelivery)
-      {
-         forcingDelivery = false;
-      }
-      //presettle means we can ack the message on the proton side before we send it, i.e. for browsers
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-      //we only need a tag if we are going to ack later
-      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
-      //encode the message
-      EncodedMessage encodedMessage = ProtonUtils.OUTBOUND.transform(message, deliveryCount);
-      //now handle the delivery
-      protonProtocolManager.handleDelivery(sender, tag, encodedMessage, message, connection, preSettle);
-
-      return encodedMessage.getLength();
-   }
-
-   @Override
-   /*
-   * handle an incoming Ack from Proton, basically pass to HornetQ to handle
-   * */
-   public void onMessage(Delivery delivery) throws HornetQAMQPException
-   {
-      ServerMessage message = (ServerMessage) delivery.getContext();
-
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-
-
-      DeliveryState remoteState = delivery.getRemoteState();
-
-      if (remoteState != null)
-      {
-         if (remoteState instanceof Accepted)
-         {
-            //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
-            // from proton, a perf hit but a must
-            try
-            {
-               protonSession.getServerSession().individualAcknowledge(consumerID, message.getMessageID());
-            }
-            catch (Exception e)
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.getMessageID(), e.getMessage());
-            }
-         }
-         else if (remoteState instanceof Released)
-         {
-            try
-            {
-               protonSession.getServerSession().individualCancel(consumerID, message.getMessageID(), false);
-            }
-            catch (Exception e)
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.getMessageID(), e.getMessage());
-            }
-         }
-         else if (remoteState instanceof Rejected || remoteState instanceof Modified)
-         {
-            try
-            {
-               protonSession.getServerSession().individualCancel(consumerID, message.getMessageID(), true);
-            }
-            catch (Exception e)
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.getMessageID(), e.getMessage());
-            }
-         }
-
-         synchronized (connection.getDeliveryLock())
-         {
-            delivery.settle();
-         }
-         //todo add tag caching
-         if (!preSettle)
-         {
-            protonSession.replaceTag(delivery.getTag());
-         }
-         sender.offer(1);
-      }
-      else
-      {
-         //todo not sure if we need to do anything here
-      }
-   }
-
-   /*
-   * check the state of the consumer, i.e. are there any more messages. only really needed for browsers?
-   * */
-   public synchronized void checkState()
-   {
-      if (!forcingDelivery && receivedForcedDelivery)
-      {
-         try
-         {
-            forcingDelivery = true;
-            receivedForcedDelivery = false;
-            protonSession.getServerSession().forceConsumerDelivery(consumerID, forcedDeliveryCount++);
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
-      }
-   }
-
-   public Sender getSender()
-   {
-      return sender;
-   }
-
-   private String formatTag(byte[] tag)
-   {
-      StringBuffer sb = new StringBuffer();
-      for (byte b : tag)
-      {
-         sb.append(b).append(":");
-      }
-      return sb.toString();
-   }
-
-   int x = 5;
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java
deleted file mode 100644
index 9b7a605..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import org.apache.qpid.proton.engine.Delivery;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         <p/>
- *         An interface to handle deliveries, either messages, acks or transaction calls
- */
-public interface ProtonDeliveryHandler
-{
-   void onMessage(Delivery delivery) throws HornetQAMQPException;
-
-   void checkState();
-
-   void close() throws HornetQAMQPException;
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java
deleted file mode 100644
index 71f3a83..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-import org.hornetq.core.server.QueueQueryResult;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         <p/>
- *         handles incoming messages via a Proton Receiver and forwards them to HornetQ
- */
-public class ProtonProducer implements ProtonDeliveryHandler
-{
-   private final ProtonRemotingConnection connection;
-
-   private final ProtonSession protonSession;
-
-   private final ProtonProtocolManager protonProtocolManager;
-
-   private final Receiver receiver;
-
-   private final String address;
-
-   private HornetQBuffer buffer;
-
-   public ProtonProducer(ProtonRemotingConnection connection, ProtonSession protonSession, ProtonProtocolManager protonProtocolManager, Receiver receiver)
-   {
-      this.connection = connection;
-      this.protonSession = protonSession;
-      this.protonProtocolManager = protonProtocolManager;
-      this.receiver = receiver;
-      this.address = ((Target) receiver.getRemoteTarget()).getAddress();
-      buffer = connection.createBuffer(1024);
-   }
-
-   /*
-   * called when Proton receives a message to be delivered via a Delivery.
-   *
-   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
-   *
-   * */
-   public void onMessage(Delivery delivery) throws HornetQAMQPException
-   {
-      Receiver receiver;
-      try
-      {
-         receiver = ((Receiver) delivery.getLink());
-
-         if (!delivery.isReadable())
-         {
-            return;
-         }
-
-         protonProtocolManager.handleMessage(receiver, buffer, delivery, connection, protonSession, address);
-
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-         condition.setCondition(Symbol.valueOf("failed"));
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
-         delivery.disposition(rejected);
-      }
-   }
-
-   @Override
-   public void checkState()
-   {
-      //no op
-   }
-
-   @Override
-   public void close() throws HornetQAMQPException
-   {
-      protonSession.removeProducer(receiver);
-   }
-
-   public void init() throws HornetQAMQPException
-   {
-      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-      if (target.getDynamic())
-      {
-         //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
-         // will be deleted on closing of the session
-         SimpleString queue = new SimpleString(java.util.UUID.randomUUID().toString());
-         try
-         {
-            protonSession.getServerSession().createQueue(queue, queue, null, true, false);
-         }
-         catch (Exception e)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-         }
-         target.setAddress(queue.toString());
-      }
-      else
-      {
-         //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
-         //be a queue bound to it so we nee to check this.
-         String address = target.getAddress();
-         if (address == null)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
-         }
-         try
-         {
-            QueueQueryResult queryResult = protonSession.getServerSession().executeQueueQuery(new SimpleString(address));
-            if (!queryResult.isExists())
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
-            }
-         }
-         catch (Exception e)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage());
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java
index ae06aa1..8086925 100644
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java
@@ -13,46 +13,25 @@
 
 package org.hornetq.core.protocol.proton;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.EnumSet;
+import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transaction.Declare;
-import org.apache.qpid.proton.amqp.transaction.Declared;
-import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.impl.LinkImpl;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPIllegalStateException;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.protocol.proton.converter.ProtonMessageConverter;
+import org.hornetq.core.protocol.proton.plug.HornetQProtonConnectionCallback;
 import org.hornetq.core.remoting.impl.netty.NettyServerConnection;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationListener;
-import org.hornetq.core.transaction.Transaction;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.MessageConverter;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.Connection;
-import org.hornetq.utils.UUIDGenerator;
+import org.proton.plug.AMQPServerConnectionContext;
+import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
 
 /**
  * A proton protocol manager, basically reads the Proton Input and maps proton resources to HornetQ resources
@@ -61,272 +40,83 @@ import org.hornetq.utils.UUIDGenerator;
  */
 public class ProtonProtocolManager implements ProtocolManager, NotificationListener
 {
-   public static final EnumSet<EndpointState> UNINITIALIZED = EnumSet.of(EndpointState.UNINITIALIZED);
-
-   public static final EnumSet<EndpointState> INITIALIZED = EnumSet.complementOf(UNINITIALIZED);
-
-   public static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
-
-   public static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED);
-
-   public static final EnumSet<EndpointState> ANY_ENDPOINT_STATE = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
-
    private final HornetQServer server;
 
+   private MessageConverter protonConverter;
+
    public ProtonProtocolManager(HornetQServer server)
    {
       this.server = server;
+      this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
    }
 
-   @Override
-   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection)
+   public HornetQServer getServer()
    {
-      ProtonRemotingConnection conn = new ProtonRemotingConnection(acceptorUsed, connection, this);
-      //todo do we have a ttl?
-      return new ConnectionEntry(conn, null, System.currentTimeMillis(), 1 * 60 * 1000);
+      return server;
    }
 
-   @Override
-   public void removeHandler(String name)
-   {
-   }
 
    @Override
-   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+   public MessageConverter getConverter()
    {
-      ProtonRemotingConnection protonRemotingConnection = (ProtonRemotingConnection) connection;
-      protonRemotingConnection.setDataReceived();
-      byte[] frame = new byte[buffer.readableBytes()];
-      buffer.readBytes(frame);
-
-      protonRemotingConnection.handleFrame(frame);
+      return protonConverter;
    }
 
    @Override
-   public void addChannelHandlers(ChannelPipeline pipeliner)
+   public void onNotification(Notification notification)
    {
-      //we don't need any we do our own decoding
-   }
 
-   @Override
-   public boolean isProtocol(byte[] array)
-   {
-      String startFrame = new String(array, StandardCharsets.US_ASCII);
-      return startFrame.startsWith("AMQP");
    }
 
    @Override
-   public void handshake(NettyServerConnection connection, HornetQBuffer buffer)
+   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
    {
-      //todo move handshake to here
-   }
+      HornetQProtonConnectionCallback connectionCallback = new HornetQProtonConnectionCallback(this, remotingConnection);
 
-   @Override
-   public void onNotification(Notification notification)
-   {
-      //noop
-   }
+      AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback);
 
-   public ServerMessageImpl createServerMessage()
-   {
-      return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
-   }
+      Executor executor = server.getExecutorFactory().getExecutor();
 
-   public void handleMessage(final Receiver receiver, HornetQBuffer buffer, final Delivery delivery,
-                             final ProtonRemotingConnection connection, ProtonSession protonSession,
-                             String address) throws Exception
-   {
-      synchronized (connection.getDeliveryLock())
-      {
-         int count;
-         byte[] data = new byte[1024];
-         //todo an optimisation here would be to only use the buffer if we need more that one recv
-         while ((count = receiver.recv(data, 0, data.length)) > 0)
-         {
-            buffer.writeBytes(data, 0, count);
-         }
+      HornetQProtonRemotingConnection delegate = new HornetQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
 
-         // we keep reading until we get end of messages, i.e. -1
-         if (count == 0)
-         {
-            return;
-         }
-         receiver.advance();
-         byte[] bytes = new byte[buffer.readableBytes()];
-         buffer.readBytes(bytes);
-         buffer.clear();
-         EncodedMessage encodedMessage = new EncodedMessage(delivery.getMessageFormat(), bytes, 0, bytes.length);
-         ServerMessage message = ProtonUtils.INBOUND.transform(connection, encodedMessage);
-         //use the address on the receiver if not null, if null let's hope it was set correctly on the message
-         if (address != null)
-         {
-            message.setAddress(new SimpleString(address));
-         }
-         //todo decide on whether to deliver direct
-         protonSession.getServerSession().send(message, true);
-         server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
-         {
-            @Override
-            public void done()
-            {
-               synchronized (connection.getDeliveryLock())
-               {
-                  receiver.flow(1);
-                  delivery.settle();
-               }
-            }
+      ConnectionEntry entry = new ConnectionEntry(delegate, executor,
+                                                  System.currentTimeMillis(), HornetQClient.DEFAULT_CONNECTION_TTL);
 
-            @Override
-            public void onError(int errorCode, String errorMessage)
-            {
-               receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
-            }
-         });
-      }
+      return entry;
    }
 
-   public void handleDelivery(final Sender sender, byte[] tag, EncodedMessage encodedMessage, ServerMessage message, ProtonRemotingConnection connection, final boolean preSettle)
+   @Override
+   public void removeHandler(String name)
    {
-      synchronized (connection.getDeliveryLock())
-      {
-         final Delivery delivery;
-         delivery = sender.delivery(tag, 0, tag.length);
-         delivery.setContext(message);
-         sender.send(encodedMessage.getArray(), 0, encodedMessage.getLength());
-         server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
-         {
-            @Override
-            public void done()
-            {
-               if (preSettle)
-               {
-                  delivery.settle();
-                  ((LinkImpl) sender).addCredit(1);
-               }
-               else
-               {
-                  sender.advance();
-               }
-            }
 
-            @Override
-            public void onError(int errorCode, String errorMessage)
-            {
-               sender.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
-            }
-         });
-      }
-      connection.write();
    }
 
-   void handleNewLink(Link link, ProtonSession protonSession) throws HornetQAMQPException
+   @Override
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
    {
-      link.setSource(link.getRemoteSource());
-      link.setTarget(link.getRemoteTarget());
-      if (link instanceof Receiver)
-      {
-         Receiver receiver = (Receiver) link;
-         if (link.getRemoteTarget() instanceof Coordinator)
-         {
-            protonSession.initialise(true);
-            Coordinator coordinator = (Coordinator) link.getRemoteTarget();
-            protonSession.addTransactionHandler(coordinator, receiver);
-         }
-         else
-         {
-            protonSession.initialise(false);
-            protonSession.addProducer(receiver);
-            //todo do this using the server session flow control
-            receiver.flow(100);
-         }
-      }
-      else
-      {
-         protonSession.initialise(false);
-         Sender sender = (Sender) link;
-         protonSession.addConsumer(sender);
-         sender.offer(1);
-      }
-   }
+      HornetQProtonRemotingConnection protonConnection = (HornetQProtonRemotingConnection)connection;
 
-   public ProtonSession createSession(ProtonRemotingConnection protonConnection, TransportImpl protonTransport) throws HornetQAMQPException
-   {
-      String name = UUIDGenerator.getInstance().generateStringUUID();
-      return new ProtonSession(name, protonConnection, this, server.getStorageManager()
-         .newContext(server.getExecutorFactory().getExecutor()), server, protonTransport);
+      protonConnection.bufferReceived(protonConnection.getID(), buffer);
    }
 
-   void handleActiveLink(Link link) throws HornetQAMQPException
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
    {
-      link.setSource(link.getRemoteSource());
-      link.setTarget(link.getRemoteTarget());
-      ProtonDeliveryHandler handler = (ProtonDeliveryHandler) link.getContext();
-      handler.checkState();
+
    }
 
-   public void handleTransaction(Receiver receiver, HornetQBuffer buffer, Delivery delivery, ProtonSession protonSession) throws HornetQAMQPIllegalStateException
+   @Override
+   public boolean isProtocol(byte[] array)
    {
-      int count;
-      byte[] data = new byte[1024];
-      //todo an optimisation here would be to only use the buffer if we need more that one recv
-      while ((count = receiver.recv(data, 0, data.length)) > 0)
-      {
-         buffer.writeBytes(data, 0, count);
-      }
-
-      // we keep reading until we get end of messages, i.e. -1
-      if (count == 0)
-      {
-         return;
-      }
-      receiver.advance();
-      byte[] bytes = new byte[buffer.readableBytes()];
-      buffer.readBytes(bytes);
-      buffer.clear();
-      MessageImpl msg = new MessageImpl();
-      msg.decode(bytes, 0, bytes.length);
-      Object action = ((AmqpValue) msg.getBody()).getValue();
-      if (action instanceof Declare)
-      {
-         Transaction tx = protonSession.getServerSession().getCurrentTransaction();
-         Declared declared = new Declared();
-         declared.setTxnId(new Binary(longToBytes(tx.getID())));
-         delivery.disposition(declared);
-         delivery.settle();
-      }
-      else if (action instanceof Discharge)
-      {
-         Discharge discharge = (Discharge) action;
-         if (discharge.getFail())
-         {
-            try
-            {
-               protonSession.getServerSession().rollback(false);
-            }
-            catch (Exception e)
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
-            }
-         }
-         else
-         {
-            try
-            {
-               protonSession.getServerSession().commit();
-            }
-            catch (Exception e)
-            {
-               throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
-            }
-         }
-         delivery.settle();
-      }
+      return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';
    }
 
-   public byte[] longToBytes(long x)
+   @Override
+   public void handshake(NettyServerConnection connection, HornetQBuffer buffer)
    {
-      ByteBuffer buffer = ByteBuffer.allocate(8);
-      buffer.putLong(x);
-      return buffer.array();
    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java
deleted file mode 100644
index fab1886..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
-import org.apache.qpid.proton.engine.impl.LinkImpl;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.server.HornetQServerLogger;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.remoting.Acceptor;
-import org.hornetq.spi.core.remoting.Connection;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public class ProtonRemotingConnection implements RemotingConnection
-{
-   private TransportImpl protonTransport;
-
-   private ConnectionImpl protonConnection;
-
-   private final Map<Object, ProtonSession> sessions = new HashMap<Object, ProtonSession>();
-
-   /*
-   * Proton is not thread safe therefore we need to make sure we aren't updating the deliveries on the connection from
-   * the input of proton transport and asynchronously back from HornetQ at the same time.
-   * (this probably needs to be fixed on Proton)
-   * */
-   private final Object deliveryLock = new Object();
-
-   private boolean destroyed = false;
-
-   private String clientId;
-
-   private final Acceptor acceptorUsed;
-
-   private final long creationTime;
-
-   private final Connection connection;
-
-   private final ProtonProtocolManager protonProtocolManager;
-
-   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
-
-   private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>();
-
-   private boolean initialised = false;
-
-   private static final byte[] VERSION_HEADER = new byte[]{
-      'A', 'M', 'Q', 'P', 0, 1, 0, 0
-   };
-   private Sasl sasl;
-
-   private String username;
-
-   private String passcode;
-
-   private boolean dataReceived;
-
-   public ProtonRemotingConnection(Acceptor acceptorUsed, Connection connection, ProtonProtocolManager protonProtocolManager)
-   {
-      this.protonProtocolManager = protonProtocolManager;
-
-      this.connection = connection;
-
-      this.creationTime = System.currentTimeMillis();
-
-      this.acceptorUsed = acceptorUsed;
-
-      this.protonTransport = new TransportImpl();
-
-      this.protonConnection = new ConnectionImpl();
-
-      protonTransport.bind(protonConnection);
-   }
-
-   @Override
-   public Object getID()
-   {
-      return connection.getID();
-   }
-
-   @Override
-   public long getCreationTime()
-   {
-      return creationTime;
-   }
-
-   @Override
-   public String getRemoteAddress()
-   {
-      return connection.getRemoteAddress();
-   }
-
-   @Override
-   public void addFailureListener(final FailureListener listener)
-   {
-      if (listener == null)
-      {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      failureListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeFailureListener(final FailureListener listener)
-   {
-      if (listener == null)
-      {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      return failureListeners.remove(listener);
-   }
-
-   @Override
-   public void addCloseListener(final CloseListener listener)
-   {
-      if (listener == null)
-      {
-         throw new IllegalStateException("CloseListener cannot be null");
-      }
-
-      closeListeners.add(listener);
-   }
-
-   @Override
-   public boolean removeCloseListener(final CloseListener listener)
-   {
-      if (listener == null)
-      {
-         throw new IllegalStateException("CloseListener cannot be null");
-      }
-
-      return closeListeners.remove(listener);
-   }
-
-   @Override
-   public List<CloseListener> removeCloseListeners()
-   {
-      List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-
-      closeListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public List<FailureListener> removeFailureListeners()
-   {
-      List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-
-      failureListeners.clear();
-
-      return ret;
-   }
-
-   @Override
-   public void setCloseListeners(List<CloseListener> listeners)
-   {
-      closeListeners.clear();
-
-      closeListeners.addAll(listeners);
-   }
-
-   @Override
-   public void setFailureListeners(final List<FailureListener> listeners)
-   {
-      failureListeners.clear();
-
-      failureListeners.addAll(listeners);
-   }
-
-   public List<FailureListener> getFailureListeners()
-   {
-      // we do not return the listeners otherwise the remoting service
-      // would NOT destroy the connection.
-      return Collections.emptyList();
-   }
-
-   @Override
-   public HornetQBuffer createBuffer(int size)
-   {
-      return connection.createBuffer(size);
-   }
-
-   @Override
-   public void fail(HornetQException me)
-   {
-      HornetQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
-      // Then call the listeners
-      callFailureListeners(me);
-
-      callClosingListeners();
-
-      destroyed = true;
-
-      connection.close();
-   }
-
-   @Override
-   public void fail(HornetQException me, String scaleDownTargetNodeID)
-   {
-      fail(me);
-   }
-
-   @Override
-   public void destroy()
-   {
-      destroyed = true;
-
-      connection.close();
-
-      synchronized (deliveryLock)
-      {
-         callClosingListeners();
-      }
-   }
-
-   @Override
-   public Connection getTransportConnection()
-   {
-      return connection;
-   }
-
-   @Override
-   public boolean isClient()
-   {
-      return false;
-   }
-
-   @Override
-   public boolean isDestroyed()
-   {
-      return destroyed;
-   }
-
-   @Override
-   public void disconnect(final boolean criticalError)
-   {
-      disconnect(null, criticalError);
-   }
-
-   @Override
-   public void disconnect(final String scaleDownNodeID, final boolean criticalError)
-   {
-      destroy();
-   }
-
-   @Override
-   public boolean checkDataReceived()
-   {
-      boolean res = dataReceived;
-
-      dataReceived = false;
-
-      return res;
-   }
-
-   @Override
-   public void flush()
-   {
-      //no op
-   }
-
-   @Override
-   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
-   {
-      if (initialised)
-      {
-         protonProtocolManager.handleBuffer(this, buffer);
-      }
-      else
-      {
-         byte[] prot = new byte[4];
-         buffer.readBytes(prot);
-         String headerProt = new String(prot);
-         checkProtocol(headerProt);
-         int protocolId = buffer.readByte();
-         int major = buffer.readByte();
-         int minor = buffer.readByte();
-         int revision = buffer.readByte();
-         if (!(checkVersion(major, minor, revision) && checkProtocol(headerProt)))
-         {
-            protonTransport.close();
-            protonConnection.close();
-            write();
-            destroy();
-            return;
-         }
-         if (protocolId == 3)
-         {
-            sasl = protonTransport.sasl();
-            sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"});
-            sasl.server();
-         }
-
-         ///its only 8 bytes, there's always going to always be enough in the buffer, isn't there?
-         protonTransport.input(VERSION_HEADER, 0, VERSION_HEADER.length);
-
-         write();
-
-         initialised = true;
-
-         if (buffer.readableBytes() > 0)
-         {
-            protonProtocolManager.handleBuffer(this, buffer.copy(buffer.readerIndex(), buffer.readableBytes()));
-         }
-
-         if (sasl != null)
-         {
-            if (sasl.getRemoteMechanisms().length > 0)
-            {
-               if ("PLAIN".equals(sasl.getRemoteMechanisms()[0]))
-               {
-                  byte[] data = new byte[sasl.pending()];
-                  sasl.recv(data, 0, data.length);
-                  sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                  sasl = null;
-               }
-               else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]))
-               {
-                  sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                  sasl = null;
-               }
-            }
-
-            write();
-         }
-      }
-   }
-
-   private boolean checkProtocol(String headerProt)
-   {
-      boolean ok = "AMQP".equals(headerProt);
-      if (!ok)
-      {
-         protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Unknown Protocol " + headerProt));
-      }
-      return ok;
-   }
-
-   private boolean checkVersion(int major, int minor, int revision)
-   {
-      if (major < 1)
-      {
-         protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE,
-                                                          "Version not supported " + major + "." + minor + "." + revision));
-         return false;
-      }
-      return true;
-   }
-
-   void write()
-   {
-      synchronized (deliveryLock)
-      {
-         int size = 1024 * 64;
-         byte[] data = new byte[size];
-         boolean done = false;
-         while (!done)
-         {
-            int count = protonTransport.output(data, 0, size);
-            if (count > 0)
-            {
-               final HornetQBuffer buffer;
-               buffer = connection.createBuffer(count);
-               buffer.writeBytes(data, 0, count);
-               connection.write(buffer);
-            }
-            else
-            {
-               done = true;
-            }
-         }
-      }
-   }
-
-   public String getLogin()
-   {
-      return username;
-   }
-
-   public String getPasscode()
-   {
-      return passcode;
-   }
-
-   public ServerMessageImpl createServerMessage()
-   {
-      return protonProtocolManager.createServerMessage();
-   }
-
-   protected synchronized void setDataReceived()
-   {
-      dataReceived = true;
-   }
-
-   public void handleFrame(byte[] frame)
-   {
-      int read = 0;
-      while (read < frame.length)
-      {
-         synchronized (deliveryLock)
-         {
-            try
-            {
-               int count = protonTransport.input(frame, read, frame.length - read);
-               read += count;
-            }
-            catch (Exception e)
-            {
-               protonTransport.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, HornetQAMQPProtocolMessageBundle.BUNDLE.decodeError()));
-               write();
-               protonConnection.close();
-               return;
-            }
-         }
-
-         if (sasl != null)
-         {
-            if (sasl.getRemoteMechanisms().length > 0)
-            {
-               if ("PLAIN".equals(sasl.getRemoteMechanisms()[0]))
-               {
-                  byte[] data = new byte[sasl.pending()];
-                  sasl.recv(data, 0, data.length);
-                  setUserPass(data);
-                  sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                  sasl = null;
-               }
-               else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0]))
-               {
-                  sasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-                  sasl = null;
-               }
-            }
-         }
-
-         //handle opening of connection
-         if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED)
-         {
-            clientId = protonConnection.getRemoteContainer();
-            protonConnection.open();
-            write();
-         }
-
-         //handle any new sessions
-         Session session = protonConnection.sessionHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED);
-         while (session != null)
-         {
-            try
-            {
-               ProtonSession protonSession = getSession(session);
-               session.setContext(protonSession);
-               session.open();
-
-            }
-            catch (HornetQAMQPException e)
-            {
-               protonConnection.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-               session.close();
-            }
-            write();
-            session = protonConnection.sessionHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED);
-         }
-
-         //handle new link (producer or consumer
-         LinkImpl link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED);
-         while (link != null)
-         {
-            try
-            {
-               protonProtocolManager.handleNewLink(link, getSession(link.getSession()));
-            }
-            catch (HornetQAMQPException e)
-            {
-               link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-               link.close();
-            }
-            link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED);
-         }
-
-         //handle any deliveries
-         DeliveryImpl delivery;
-
-         Iterator<DeliveryImpl> iterator = protonConnection.getWorkSequence();
-
-         while (iterator.hasNext())
-         {
-            delivery = iterator.next();
-            ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
-            try
-            {
-               handler.onMessage(delivery);
-            }
-            catch (HornetQAMQPException e)
-            {
-               delivery.getLink().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-            }
-         }
-
-         link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.ANY_ENDPOINT_STATE);
-         while (link != null)
-         {
-            try
-            {
-               protonProtocolManager.handleActiveLink(link);
-            }
-            catch (HornetQAMQPException e)
-            {
-               link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-            }
-            link = (LinkImpl) link.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.ANY_ENDPOINT_STATE);
-         }
-
-         link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED);
-         while (link != null)
-         {
-            try
-            {
-               ((ProtonDeliveryHandler) link.getContext()).close();
-            }
-            catch (HornetQAMQPException e)
-            {
-               link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-            }
-            link.close();
-
-            link = (LinkImpl) link.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED);
-         }
-
-         session = protonConnection.sessionHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED);
-         while (session != null)
-         {
-            ProtonSession protonSession = (ProtonSession) session.getContext();
-            protonSession.close();
-            sessions.remove(session);
-            session.close();
-            session = session.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED);
-         }
-
-         if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED)
-         {
-            for (ProtonSession protonSession : sessions.values())
-            {
-               protonSession.close();
-            }
-            sessions.clear();
-            protonConnection.close();
-            write();
-            destroy();
-         }
-
-         write();
-      }
-   }
-
-   private void setUserPass(byte[] data)
-   {
-      String bytes = new String(data);
-      String[] credentials = bytes.split(Character.toString((char) 0));
-      int offSet = 0;
-      if (credentials.length > 0)
-      {
-         if (credentials[0].length() == 0)
-         {
-            offSet = 1;
-         }
-
-         if (credentials.length >= offSet)
-         {
-            username = credentials[offSet];
-         }
-         if (credentials.length >= (offSet + 1))
-         {
-            passcode = credentials[offSet + 1];
-         }
-      }
-   }
-
-   private ProtonSession getSession(Session realSession) throws HornetQAMQPException
-   {
-      ProtonSession protonSession = sessions.get(realSession);
-      if (protonSession == null)
-      {
-         protonSession = protonProtocolManager.createSession(this, protonTransport);
-         sessions.put(realSession, protonSession);
-      }
-      return protonSession;
-
-   }
-
-   private void callFailureListeners(final HornetQException me)
-   {
-      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
-      for (final FailureListener listener : listenersClone)
-      {
-         try
-         {
-            listener.connectionFailed(me, false);
-         }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            HornetQServerLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
-   private void callClosingListeners()
-   {
-      final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
-
-      for (final CloseListener listener : listenersClone)
-      {
-         try
-         {
-            listener.connectionClosed();
-         }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            HornetQServerLogger.LOGGER.errorCallingFailureListener(t);
-         }
-      }
-   }
-
-   public Object getDeliveryLock()
-   {
-      return deliveryLock;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java
deleted file mode 100644
index 6c46f2f..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.persistence.OperationContext;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPInternalErrorException;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServerLogger;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.spi.core.protocol.SessionCallback;
-import org.hornetq.spi.core.remoting.ReadyListener;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         4/10/13
- */
-public class ProtonSession implements SessionCallback
-{
-   private final String name;
-
-   private final ProtonRemotingConnection connection;
-
-   private final HornetQServer server;
-
-   private final TransportImpl protonTransport;
-
-   private final ProtonProtocolManager protonProtocolManager;
-
-   private ServerSession serverSession;
-
-   private OperationContext context;
-
-   //todo make this configurable
-   private int tagCacheSize = 1000;
-
-   private long currentTag = 0;
-
-   private final List<byte[]> tagCache = new ArrayList<byte[]>();
-
-   private Map<Object, ProtonProducer> producers = new HashMap<Object, ProtonProducer>();
-
-   private Map<Long, ProtonConsumer> consumers = new HashMap<Long, ProtonConsumer>();
-
-   private boolean closed = false;
-
-   public ProtonSession(String name, ProtonRemotingConnection connection, ProtonProtocolManager protonProtocolManager, OperationContext operationContext, HornetQServer server, TransportImpl protonTransport)
-   {
-      this.name = name;
-      this.connection = connection;
-      context = operationContext;
-      this.server = server;
-      this.protonTransport = protonTransport;
-      this.protonProtocolManager = protonProtocolManager;
-   }
-
-   public ServerSession getServerSession()
-   {
-      return serverSession;
-   }
-
-   /*
-   * we need to initialise the actual server session when we receive the first linkas this tells us whether or not the
-   * session is transactional
-   * */
-   public void initialise(boolean transacted) throws HornetQAMQPInternalErrorException
-   {
-      if (serverSession == null)
-      {
-         try
-         {
-            serverSession = server.createSession(name,
-                                                 connection.getLogin(),
-                                                 connection.getPasscode(),
-                                                 HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                                 connection,
-                                                 !transacted,
-                                                 !transacted,
-                                                 false,
-                                                 false,
-                                                 null,
-                                                 this);
-         }
-         catch (Exception e)
-         {
-            throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQSession(e.getMessage());
-         }
-      }
-   }
-
-   @Override
-   public void sendProducerCreditsMessage(int credits, SimpleString address)
-   {
-   }
-
-   @Override
-   public void sendProducerCreditsFailMessage(int credits, SimpleString address)
-   {
-   }
-
-   @Override
-   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
-   {
-      ProtonConsumer protonConsumer = consumers.get(consumerID);
-      if (protonConsumer != null)
-      {
-         return protonConsumer.handleDelivery(message, deliveryCount);
-      }
-      return 0;
-   }
-
-   @Override
-   public int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount)
-   {
-      return 0;
-   }
-
-   @Override
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
-   {
-      return 0;
-   }
-
-   @Override
-   public void closed()
-   {
-      //To change body of implemented methods use File | Settings | File Templates.
-   }
-
-   @Override
-   public void addReadyListener(ReadyListener listener)
-   {
-      //To change body of implemented methods use File | Settings | File Templates.
-   }
-
-   @Override
-   public void removeReadyListener(ReadyListener listener)
-   {
-      //To change body of implemented methods use File | Settings | File Templates.
-   }
-
-   @Override
-   public void disconnect(long consumerId, String queueName)
-   {
-      ProtonConsumer protonConsumer = consumers.remove(consumerId);
-      if (protonConsumer != null)
-      {
-         try
-         {
-            protonConsumer.close();
-         }
-         catch (HornetQAMQPException e)
-         {
-            protonConsumer.getSender().setTarget(null);
-            protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         }
-         connection.write();
-      }
-   }
-
-   public OperationContext getContext()
-   {
-      return context;
-   }
-
-   public void addProducer(Receiver receiver) throws HornetQAMQPException
-   {
-      try
-      {
-         ProtonProducer producer = new ProtonProducer(connection, this, protonProtocolManager, receiver);
-         producer.init();
-         producers.put(receiver, producer);
-         receiver.setContext(producer);
-         receiver.open();
-      }
-      catch (HornetQAMQPException e)
-      {
-         producers.remove(receiver);
-         receiver.setTarget(null);
-         receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         receiver.close();
-      }
-   }
-
-
-   public void addTransactionHandler(Coordinator coordinator, Receiver receiver)
-   {
-      TransactionHandler transactionHandler = new TransactionHandler(connection, coordinator, protonProtocolManager, this);
-      receiver.setContext(transactionHandler);
-      receiver.open();
-      receiver.flow(100);
-   }
-
-   public void addConsumer(Sender sender) throws HornetQAMQPException
-   {
-      ProtonConsumer protonConsumer = new ProtonConsumer(connection, sender, this, server, protonProtocolManager);
-
-      try
-      {
-         protonConsumer.init();
-         consumers.put(protonConsumer.getConsumerID(), protonConsumer);
-         sender.setContext(protonConsumer);
-         sender.open();
-         protonConsumer.start();
-      }
-      catch (HornetQAMQPException e)
-      {
-         consumers.remove(protonConsumer.getConsumerID());
-         sender.setSource(null);
-         sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         sender.close();
-      }
-   }
-
-   public byte[] getTag()
-   {
-      synchronized (tagCache)
-      {
-         byte[] bytes;
-         if (tagCache.size() > 0)
-         {
-            bytes = tagCache.remove(0);
-         }
-         else
-         {
-            bytes = Long.toHexString(currentTag++).getBytes();
-         }
-         return bytes;
-      }
-   }
-
-   public void replaceTag(byte[] tag)
-   {
-      synchronized (tagCache)
-      {
-         if (tagCache.size() < tagCacheSize)
-         {
-            tagCache.add(tag);
-         }
-      }
-   }
-
-   public void close()
-   {
-      if (closed)
-      {
-         return;
-      }
-
-      for (ProtonProducer protonProducer : producers.values())
-      {
-         try
-         {
-            protonProducer.close();
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.errorClosingSession(e);
-         }
-      }
-      producers.clear();
-      for (ProtonConsumer protonConsumer : consumers.values())
-      {
-         try
-         {
-            protonConsumer.close();
-         }
-         catch (Exception e)
-         {
-            HornetQServerLogger.LOGGER.errorClosingConsumer(e);
-         }
-      }
-      consumers.clear();
-      try
-      {
-         getServerSession().rollback(true);
-         getServerSession().close(false);
-      }
-      catch (Exception e)
-      {
-         HornetQServerLogger.LOGGER.errorClosingSession(e);
-      }
-      closed = true;
-   }
-
-   public void removeConsumer(long consumerID) throws HornetQAMQPException
-   {
-      consumers.remove(consumerID);
-      try
-      {
-         getServerSession().closeConsumer(consumerID);
-      }
-      catch (Exception e)
-      {
-         throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorClosingConsumer(consumerID, e.getMessage());
-      }
-   }
-
-   public void removeProducer(Receiver receiver)
-   {
-      producers.remove(receiver);
-   }
-}


Mime
View raw message