activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [06/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation
Date Tue, 27 Sep 2016 13:54:33 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
deleted file mode 100644
index 3386732..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.server;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPServerConnectionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.handler.ExtCapability;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext {
-
-   public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
-      super(connectionSP, dispatchExecutor, scheduledPool);
-   }
-
-   public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP,
-                                        String containerId,
-                                        int idleTimeout,
-                                        int maxFrameSize,
-                                        int channelMax,
-                                        Executor dispatchExecutor,
-                                        ScheduledExecutorService scheduledPool) {
-      super(connectionSP, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
-   }
-
-   @Override
-   protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
-      AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
-      AbstractProtonSessionContext protonSession = new ProtonServerSessionContext(sessionSPI, this, realSession);
-
-      return protonSession;
-   }
-
-   @Override
-   protected boolean validateConnection(Connection connection) {
-      return connectionCallback.validateConnection(connection, handler.getSASLResult());
-   }
-
-   @Override
-   protected void remoteLinkOpened(Link link) throws Exception {
-
-      ProtonServerSessionContext protonSession = (ProtonServerSessionContext) getSessionExtension(link.getSession());
-
-      link.setSource(link.getRemoteSource());
-      link.setTarget(link.getRemoteTarget());
-      if (link instanceof Receiver) {
-         Receiver receiver = (Receiver) link;
-         if (link.getRemoteTarget() instanceof Coordinator) {
-            Coordinator coordinator = (Coordinator) link.getRemoteTarget();
-            protonSession.addTransactionHandler(coordinator, receiver);
-         }
-         else {
-            protonSession.addReceiver(receiver);
-         }
-      }
-      else {
-         Sender sender = (Sender) link;
-         protonSession.addSender(sender);
-         sender.offer(1);
-      }
-   }
-
-   @Override
-   public Symbol[] getConnectionCapabilitiesOffered() {
-      return ExtCapability.getCapabilities();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
deleted file mode 100644
index d5ab9ea..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.server;
-
-import org.proton.plug.AMQPConnectionContextFactory;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPServerConnectionContext;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
-
-public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory {
-
-   private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory();
-
-   public static ProtonServerConnectionContextFactory getFactory() {
-      return theInstance;
-   }
-
-   @Override
-   public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
-      return createConnection(connectionCallback, null, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
-   }
-
-   @Override
-   public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
-                                                       String containerId,
-                                                       int idleTimeout,
-                                                       int maxFrameSize,
-                                                       int channelMax,
-                                                       Executor dispatchExecutor,
-                                                       ScheduledExecutorService scheduledPool) {
-      return new ProtonServerConnectionContext(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
deleted file mode 100644
index 173ff28..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.server;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transaction.TransactionalState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonReceiverContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException;
-import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-
-import static org.proton.plug.util.DeliveryUtil.readDelivery;
-
-public class ProtonServerReceiverContext extends AbstractProtonReceiverContext {
-
-   private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
-
-   /*
-    The maximum number of credits we will allocate to clients.
-    This number is also used by the broker when refresh client credits.
-     */
-   private static int maxCreditAllocation = 100;
-
-   // Used by the broker to decide when to refresh clients credit.  This is not used when client requests credit.
-   private static int minCreditRefresh = 30;
-
-   public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
-                                      AbstractConnectionContext connection,
-                                      AbstractProtonSessionContext protonSession,
-                                      Receiver receiver) {
-      super(sessionSPI, connection, protonSession, receiver);
-   }
-
-   @Override
-   public void onFlow(int credits, boolean drain) {
-      flow(Math.min(credits, maxCreditAllocation), maxCreditAllocation);
-   }
-
-   @Override
-   public void initialise() throws Exception {
-      super.initialise();
-      org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-
-      if (target != null) {
-         if (target.getDynamic()) {
-            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
-            // will be deleted on closing of the session
-            address = sessionSPI.tempQueueName();
-
-            try {
-               sessionSPI.createTemporaryQueue(address);
-            }
-            catch (Exception e) {
-               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-            }
-            target.setAddress(address);
-         }
-         else {
-            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
-            //be a queue bound to it so we nee to check this.
-            address = target.getAddress();
-            if (address == null) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet();
-            }
-
-            try {
-               if (!sessionSPI.bindingQuery(address)) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
-               }
-            }
-            catch (ActiveMQAMQPNotFoundException e) {
-               throw e;
-            }
-            catch (Exception e) {
-               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-            }
-         }
-      }
-      flow(maxCreditAllocation, minCreditRefresh);
-   }
-
-   /*
-   * called when Proton receives a message to be delivered via a Delivery.
-   *
-   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
-   *
-   * */
-   @Override
-   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      Receiver receiver;
-      try {
-         receiver = ((Receiver) delivery.getLink());
-
-         if (!delivery.isReadable()) {
-            return;
-         }
-
-         if (delivery.isPartial()) {
-            return;
-         }
-
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
-         try {
-            synchronized (connection.getLock()) {
-               readDelivery(receiver, buffer);
-
-               receiver.advance();
-
-               Transaction tx = null;
-               if (delivery.getRemoteState() instanceof TransactionalState) {
-
-                  TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-                  tx = this.sessionSPI.getTransaction(txState.getTxnId());
-               }
-               sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
-
-               flow(maxCreditAllocation, minCreditRefresh);
-            }
-         }
-         finally {
-            buffer.release();
-         }
-      }
-      catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-         condition.setCondition(Symbol.valueOf("failed"));
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
-         delivery.disposition(rejected);
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
deleted file mode 100644
index e9bd123..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.server;
-
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
-import org.apache.activemq.artemis.selector.filter.FilterException;
-import org.apache.activemq.artemis.selector.impl.SelectorParser;
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transaction.TransactionalState;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AmqpSupport;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonContextSender;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-import org.proton.plug.exceptions.ActiveMQAMQPNotFoundException;
-import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-
-import static org.proton.plug.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.proton.plug.AmqpSupport.findFilter;
-
-public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender {
-
-   private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
-
-   private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector");
-   private static final Symbol COPY = Symbol.valueOf("copy");
-   private static final Symbol TOPIC = Symbol.valueOf("topic");
-
-   private Object brokerConsumer;
-
-   public ProtonServerSenderContext(AbstractConnectionContext connection,
-                                    Sender sender,
-                                    AbstractProtonSessionContext protonSession,
-                                    AMQPSessionCallback server) {
-      super(connection, sender, protonSession, server);
-   }
-
-   public Object getBrokerConsumer() {
-      return brokerConsumer;
-   }
-
-   @Override
-   public void onFlow(int currentCredits, boolean drain) {
-      super.onFlow(currentCredits, drain);
-      sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
-   }
-
-   /*
-* start the session
-* */
-   @Override
-   public void start() throws ActiveMQAMQPException {
-      super.start();
-      // protonSession.getServerSession().start();
-
-      //todo add flow control
-      try {
-         // to do whatever you need to make the broker start sending messages to the consumer
-         //this could be null if a link reattach has happened
-         if (brokerConsumer != null) {
-            sessionSPI.startSender(brokerConsumer);
-         }
-         //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1);
-      }
-      catch (Exception e) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
-      }
-   }
-
-   /**
-    * create the actual underlying ActiveMQ Artemis Server Consumer
-    */
-   @Override
-   public void initialise() throws Exception {
-      super.initialise();
-
-      Source source = (Source) sender.getRemoteSource();
-
-      String queue;
-
-      String selector = null;
-
-      /*
-      * even tho the filter is a map it will only return a single filter unless a nolocal is also provided
-      * */
-      if (source != null) {
-         Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
-         if (filter != null) {
-            selector = filter.getValue().getDescribed().toString();
-            // Validate the Selector.
-            try {
-               SelectorParser.parse(selector);
-            }
-            catch (FilterException e) {
-               close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
-               return;
-            }
-         }
-      }
-
-      /*
-      * if we have a capability for a topic (qpid-jms) or we are configured on this address to act like a topic then act
-      * like a subscription.
-      * */
-      boolean isPubSub = hasCapabilities(TOPIC, source) || isPubSub(source);
-
-      if (isPubSub) {
-         if (findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS) != null) {
-            String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-            String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
-            if (selector != null) {
-               selector += " AND " + noLocalFilter;
-            }
-            else {
-               selector = noLocalFilter;
-            }
-         }
-      }
-
-      if (source == null) {
-         // Attempt to recover a previous subscription happens when a link reattach happens on a subscription queue
-         String clientId = connection.getRemoteContainer();
-         String pubId = sender.getName();
-         queue = clientId + ":" + pubId;
-         boolean exists = sessionSPI.queueQuery(queue, false).isExists();
-
-         /*
-         * If it exists then we know it is a subscription so we set the capabilities on the source so we can delete on a
-         * link remote close.
-         * */
-         if (exists) {
-            source = new org.apache.qpid.proton.amqp.messaging.Source();
-            source.setAddress(queue);
-            source.setDurable(TerminusDurability.UNSETTLED_STATE);
-            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
-            source.setDistributionMode(COPY);
-            source.setCapabilities(TOPIC);
-            sender.setSource(source);
-         }
-         else {
-            throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
-         }
-      }
-      else {
-         if (source.getDynamic()) {
-            //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and
-            // will be deleted on closing of the session
-            queue = java.util.UUID.randomUUID().toString();
-            try {
-               sessionSPI.createTemporaryQueue(queue);
-               //protonSession.getServerSession().createQueue(queue, queue, null, true, false);
-            }
-            catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-            }
-            source.setAddress(queue);
-         }
-         else {
-            //if not dynamic then we use the targets address as the address to forward the messages to, however there has to
-            //be a queue bound to it so we nee to check this.
-            if (isPubSub) {
-               // if we are a subscription and durable create a durable queue using the container id and link name
-               if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
-                                TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
-                  String clientId = connection.getRemoteContainer();
-                  String pubId = sender.getName();
-                  queue = clientId + ":" + pubId;
-                  QueueQueryResult result = sessionSPI.queueQuery(queue, false);
-
-                  if (result.isExists()) {
-                     // If a client reattaches to a durable subscription with a different no-local filter value, selector
-                     // or address then we must recreate the queue (JMS semantics).
-
-                     if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) ||
-                        (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) {
-                        if (result.getConsumerCount() == 0) {
-                           sessionSPI.deleteQueue(queue);
-                           sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
-                        }
-                        else {
-                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
-                        }
-                     }
-                  }
-                  else {
-                     sessionSPI.createDurableQueue(source.getAddress(), queue, selector);
-                  }
-                  source.setAddress(queue);
-               }
-               //otherwise we are a volatile subscription
-               else {
-                  queue = java.util.UUID.randomUUID().toString();
-                  try {
-                     sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector);
-                  }
-                  catch (Exception e) {
-                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
-                  }
-                  source.setAddress(queue);
-               }
-            }
-            else {
-               queue = source.getAddress();
-            }
-            if (queue == null) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
-            }
-
-            try {
-               if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
-               }
-            }
-            catch (ActiveMQAMQPNotFoundException e) {
-               throw e;
-            }
-            catch (Exception e) {
-               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-            }
-         }
-
-         boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
-         try {
-            brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly);
-         }
-         catch (Exception e) {
-            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage());
-         }
-      }
-   }
-
-   private boolean isPubSub(Source source) {
-      String pubSubPrefix = sessionSPI.getPubSubPrefix();
-      return source != null && pubSubPrefix != null && source.getAddress() != null && source.getAddress().startsWith(pubSubPrefix);
-   }
-
-   /*
-   * close the session
-   * */
-   @Override
-   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      super.close(condition);
-      try {
-         sessionSPI.closeSender(brokerConsumer);
-      }
-      catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
-      }
-   }
-
-   /*
-   * close the session
-   * */
-   @Override
-   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
-      super.close(remoteLinkClose);
-      try {
-         sessionSPI.closeSender(brokerConsumer);
-         //if this is a link close rather than a connection close or detach, we need to delete any durable resources for
-         // say pub subs
-         if (remoteLinkClose) {
-            Source source = (Source) sender.getSource();
-            if (source != null && source.getAddress() != null && hasCapabilities(TOPIC, source)) {
-               String queueName = source.getAddress();
-               QueueQueryResult result = sessionSPI.queueQuery(queueName, false);
-               if (result.isExists() && source.getDynamic()) {
-                  sessionSPI.deleteQueue(queueName);
-               }
-               else {
-                  String clientId = connection.getRemoteContainer();
-                  String pubId = sender.getName();
-                  String queue = clientId + ":" + pubId;
-                  result = sessionSPI.queueQuery(queue, false);
-                  if (result.isExists()) {
-                     if (result.getConsumerCount() > 0) {
-                        System.out.println("error");
-                     }
-                     sessionSPI.deleteQueue(queue);
-                  }
-               }
-            }
-         }
-      }
-      catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage());
-      }
-   }
-
-   @Override
-   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      Object message = delivery.getContext();
-
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-
-      DeliveryState remoteState = delivery.getRemoteState();
-
-      if (remoteState != null) {
-         // If we are transactional then we need ack if the msg has been accepted
-         if (remoteState instanceof TransactionalState) {
-
-            TransactionalState txState = (TransactionalState) remoteState;
-            Transaction tx = this.sessionSPI.getTransaction(txState.getTxnId());
-            if (txState.getOutcome() != null) {
-               Outcome outcome = txState.getOutcome();
-               if (outcome instanceof Accepted) {
-                  if (!delivery.remotelySettled()) {
-                     TransactionalState txAccepted = new TransactionalState();
-                     txAccepted.setOutcome(Accepted.getInstance());
-                     txAccepted.setTxnId(txState.getTxnId());
-
-                     delivery.disposition(txAccepted);
-                  }
-                  //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
-                  // from dealer, a perf hit but a must
-                  try {
-                     sessionSPI.ack(tx, brokerConsumer, message);
-                  }
-                  catch (Exception e) {
-                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
-                  }
-               }
-            }
-         }
-         else if (remoteState instanceof Accepted) {
-            //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order
-            // from dealer, a perf hit but a must
-            try {
-               sessionSPI.ack(null, brokerConsumer, message);
-            }
-            catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
-            }
-         }
-         else if (remoteState instanceof Released) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, false);
-            }
-            catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
-            }
-         }
-         else if (remoteState instanceof Rejected || remoteState instanceof Modified) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, true);
-            }
-            catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
-            }
-         }
-         //todo add tag caching
-         if (!preSettle) {
-            protonSession.replaceTag(delivery.getTag());
-         }
-
-         synchronized (connection.getLock()) {
-            delivery.settle();
-            sender.offer(1);
-         }
-
-      }
-      else {
-         //todo not sure if we need to do anything here
-      }
-   }
-
-   @Override
-   public synchronized void checkState() {
-      super.checkState();
-      sessionSPI.resumeDelivery(brokerConsumer);
-   }
-
-   /**
-    * handle an out going message from ActiveMQ Artemis, send via the Proton Sender
-    */
-   @Override
-   public int deliverMessage(Object message, int deliveryCount) throws Exception {
-      if (closed) {
-         System.err.println("Message can't be delivered as it's closed");
-         return 0;
-      }
-
-      //encode the message
-      ProtonJMessage serverMessage;
-      try {
-         // This can be done a lot better here
-         serverMessage = sessionSPI.encodeMessage(message, deliveryCount);
-      }
-      catch (Throwable e) {
-         log.warn(e.getMessage(), e);
-         throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-      }
-
-      return performSend(serverMessage, message);
-   }
-
-   private static boolean hasCapabilities(Symbol symbol, Source source) {
-      if (source != null) {
-         if (source.getCapabilities() != null) {
-            for (Symbol cap : source.getCapabilities()) {
-               if (symbol.equals(cap)) {
-                  return true;
-               }
-            }
-         }
-      }
-      return false;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
deleted file mode 100644
index 983fa4e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.server;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonContextSender;
-import org.proton.plug.context.AbstractProtonReceiverContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.context.ProtonTransactionHandler;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-public class ProtonServerSessionContext extends AbstractProtonSessionContext {
-
-   public ProtonServerSessionContext(AMQPSessionCallback sessionSPI,
-                                     AbstractConnectionContext connection,
-                                     Session session) {
-      super(sessionSPI, connection, session);
-   }
-
-   protected Map<Object, AbstractProtonContextSender> serverSenders = new HashMap<>();
-
-   /**
-    * The consumer object from the broker or the key used to store the sender
-    *
-    * @param message
-    * @param consumer
-    * @param deliveryCount
-    * @return the number of bytes sent
-    */
-   public int serverDelivery(Object message, Object consumer, int deliveryCount) throws Exception {
-      ProtonServerSenderContext protonSender = (ProtonServerSenderContext) serverSenders.get(consumer);
-      if (protonSender != null) {
-         return protonSender.deliverMessage(message, deliveryCount);
-      }
-      return 0;
-   }
-
-   public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
-      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
-
-      coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"),
-                                  Symbol.getSymbol("amqp:multi-txns-per-ssn"),
-                                  Symbol.getSymbol("amqp:multi-ssns-per-txn"));
-
-      receiver.setContext(transactionHandler);
-      receiver.open();
-      receiver.flow(100);
-   }
-
-   public void addSender(Sender sender) throws Exception {
-      ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI);
-
-      try {
-         protonSender.initialise();
-         senders.put(sender, protonSender);
-         serverSenders.put(protonSender.getBrokerConsumer(), protonSender);
-         sender.setContext(protonSender);
-         sender.open();
-         protonSender.start();
-      }
-      catch (ActiveMQAMQPException e) {
-         senders.remove(sender);
-         sender.setSource(null);
-         sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         sender.close();
-      }
-   }
-
-   @Override
-   public void removeSender(Sender sender) throws ActiveMQAMQPException {
-      ProtonServerSenderContext senderRemoved = (ProtonServerSenderContext) senders.remove(sender);
-      if (senderRemoved != null) {
-         serverSenders.remove(senderRemoved.getBrokerConsumer());
-      }
-   }
-
-   public void addReceiver(Receiver receiver) throws Exception {
-      try {
-         AbstractProtonReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver);
-         protonReceiver.initialise();
-         receivers.put(receiver, protonReceiver);
-         receiver.setContext(protonReceiver);
-         receiver.open();
-      }
-      catch (ActiveMQAMQPException e) {
-         receivers.remove(receiver);
-         receiver.setTarget(null);
-         receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         receiver.close();
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
deleted file mode 100644
index 4838d55..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.Symbol;
-
-public class ActiveMQAMQPException extends ActiveMQException {
-
-   private static final String ERROR_PREFIX = "amqp:";
-
-   public Symbol getAmqpError() {
-      return amqpError;
-   }
-
-   private final Symbol amqpError;
-
-   public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e, ActiveMQExceptionType t) {
-      super(message, e, t);
-      this.amqpError = amqpError;
-   }
-
-   public ActiveMQAMQPException(Symbol amqpError, String message, ActiveMQExceptionType t) {
-      super(message, t);
-      this.amqpError = amqpError;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
deleted file mode 100644
index 7818ef9..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPIllegalStateException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPIllegalStateException(String message) {
-      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.ILLEGAL_STATE);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
deleted file mode 100644
index 2c0b0ae..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPInternalErrorException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPInternalErrorException(String message, Throwable e) {
-      super(AmqpError.INTERNAL_ERROR, message, e, ActiveMQExceptionType.INTERNAL_ERROR);
-   }
-
-   public ActiveMQAMQPInternalErrorException(String message) {
-      super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
deleted file mode 100644
index f5dd168..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPInvalidFieldException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPInvalidFieldException(String message) {
-      super(AmqpError.INVALID_FIELD, message, ActiveMQExceptionType.ILLEGAL_STATE);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
deleted file mode 100644
index 02cc15c..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPNotFoundException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPNotFoundException(String message) {
-      super(AmqpError.NOT_FOUND, message, ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
deleted file mode 100644
index 861e236..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPNotImplementedException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPNotImplementedException(String message) {
-      super(AmqpError.NOT_IMPLEMENTED, message, ActiveMQExceptionType.NOT_IMPLEMTNED_EXCEPTION);
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
deleted file mode 100644
index 2c64a8d..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPResourceLimitExceededException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPResourceLimitExceededException(String message) {
-      super(AmqpError.RESOURCE_LIMIT_EXCEEDED, message, ActiveMQExceptionType.ADDRESS_FULL);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
deleted file mode 100644
index c86c25d..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.exceptions;
-
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-public class ActiveMQAMQPTimeoutException extends ActiveMQAMQPException {
-
-   public ActiveMQAMQPTimeoutException(String message) {
-      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.CONNECTION_TIMEDOUT);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
deleted file mode 100644
index c020cbb..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler;
-
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-
-/**
- * EventHandler
- */
-public interface EventHandler {
-
-   void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
-
-   void onInit(Connection connection) throws Exception;
-
-   void onLocalOpen(Connection connection) throws Exception;
-
-   void onRemoteOpen(Connection connection) throws Exception;
-
-   void onLocalClose(Connection connection) throws Exception;
-
-   void onRemoteClose(Connection connection) throws Exception;
-
-   void onFinal(Connection connection) throws Exception;
-
-   void onInit(Session session) throws Exception;
-
-   void onLocalOpen(Session session) throws Exception;
-
-   void onRemoteOpen(Session session) throws Exception;
-
-   void onLocalClose(Session session) throws Exception;
-
-   void onRemoteClose(Session session) throws Exception;
-
-   void onFinal(Session session) throws Exception;
-
-   void onInit(Link link) throws Exception;
-
-   void onLocalOpen(Link link) throws Exception;
-
-   void onRemoteOpen(Link link) throws Exception;
-
-   void onLocalClose(Link link) throws Exception;
-
-   void onRemoteClose(Link link) throws Exception;
-
-   void onFlow(Link link) throws Exception;
-
-   void onFinal(Link link) throws Exception;
-
-   void onRemoteDetach(Link link) throws Exception;
-
-   void onDetach(Link link) throws Exception;
-
-   void onDelivery(Delivery delivery) throws Exception;
-
-   void onTransport(Transport transport) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/Events.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/Events.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/Events.java
deleted file mode 100644
index 2f978c7..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/Events.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler;
-
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Transport;
-
-/**
- * TODO : this needs a better home
- */
-public final class Events {
-
-   public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception {
-      handler.onTransport(transport);
-   }
-
-   public static void dispatch(Event event, EventHandler handler) throws Exception {
-      switch (event.getType()) {
-         case CONNECTION_INIT:
-            handler.onInit(event.getConnection());
-            break;
-         case CONNECTION_LOCAL_OPEN:
-            handler.onLocalOpen(event.getConnection());
-            break;
-         case CONNECTION_REMOTE_OPEN:
-            handler.onRemoteOpen(event.getConnection());
-            break;
-         case CONNECTION_LOCAL_CLOSE:
-            handler.onLocalClose(event.getConnection());
-            break;
-         case CONNECTION_REMOTE_CLOSE:
-            handler.onRemoteClose(event.getConnection());
-            break;
-         case CONNECTION_FINAL:
-            handler.onFinal(event.getConnection());
-            break;
-         case SESSION_INIT:
-            handler.onInit(event.getSession());
-            break;
-         case SESSION_LOCAL_OPEN:
-            handler.onLocalOpen(event.getSession());
-            break;
-         case SESSION_REMOTE_OPEN:
-            handler.onRemoteOpen(event.getSession());
-            break;
-         case SESSION_LOCAL_CLOSE:
-            handler.onLocalClose(event.getSession());
-            break;
-         case SESSION_REMOTE_CLOSE:
-            handler.onRemoteClose(event.getSession());
-            break;
-         case SESSION_FINAL:
-            handler.onFinal(event.getSession());
-            break;
-         case LINK_INIT:
-            handler.onInit(event.getLink());
-            break;
-         case LINK_LOCAL_OPEN:
-            handler.onLocalOpen(event.getLink());
-            break;
-         case LINK_REMOTE_OPEN:
-            handler.onRemoteOpen(event.getLink());
-            break;
-         case LINK_LOCAL_CLOSE:
-            handler.onLocalClose(event.getLink());
-            break;
-         case LINK_REMOTE_CLOSE:
-            handler.onRemoteClose(event.getLink());
-            break;
-         case LINK_FLOW:
-            handler.onFlow(event.getLink());
-            break;
-         case LINK_FINAL:
-            handler.onFinal(event.getLink());
-            break;
-         case LINK_LOCAL_DETACH:
-            handler.onDetach(event.getLink());
-            break;
-         case LINK_REMOTE_DETACH:
-            handler.onRemoteDetach(event.getLink());
-            break;
-         case TRANSPORT:
-            handler.onTransport(event.getTransport());
-            break;
-         case DELIVERY:
-            handler.onDelivery(event.getDelivery());
-            break;
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
deleted file mode 100644
index cbb96fd..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.engine.Connection;
-
-import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
-import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
-
-public class ExtCapability {
-
-   public static final Symbol[] capabilities = new Symbol[] {
-      SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY
-   };
-
-   public static Symbol[] getCapabilities() {
-      return capabilities;
-   }
-
-   public static boolean needUniqueConnection(Connection connection) {
-      Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
-      if (extCapabilities != null) {
-         for (Symbol sym : extCapabilities) {
-            if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) {
-               return true;
-            }
-         }
-      }
-      return false;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
deleted file mode 100644
index d02546b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler;
-
-import java.util.concurrent.Executor;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Transport;
-import org.proton.plug.ClientSASL;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.SASLResult;
-import org.proton.plug.handler.impl.ProtonHandlerImpl;
-
-/**
- * This is a definition of the public interface for {@link org.proton.plug.handler.impl.ProtonHandlerImpl}
- */
-public interface ProtonHandler {
-
-   long tick(boolean firstTick);
-
-   final class Factory {
-      public static ProtonHandler create(Executor dispatchExecutor) {
-         return new ProtonHandlerImpl(dispatchExecutor);
-      }
-   }
-
-   /**
-    * It returns true if the transport connection has any capacity available
-    *
-    * @return
-    */
-   int capacity();
-
-   Transport getTransport();
-
-   Connection getConnection();
-
-   /**
-    * Add an event handler to the chain
-    *
-    * @param handler
-    * @return
-    */
-   ProtonHandler addEventHandler(EventHandler handler);
-
-   void createClientSasl(ClientSASL clientSASL);
-
-   /**
-    * To be used on server connections. To define SASL integration.
-    *
-    * @param handlers
-    */
-   void createServerSASL(ServerSASL[] handlers);
-
-   /**
-    * To return the SASL Mechanism that was successful with the connection.
-    * This should contain any state such as user and password
-    *
-    * @return
-    */
-   SASLResult getSASLResult();
-
-   /**
-    * The input on the Handler.
-    * Notice that buffer will be positioned up to where we needed
-    *
-    * @param buffer
-    */
-   void inputBuffer(ByteBuf buffer);
-
-   /**
-    * To be used at your discretion to verify if the client was active since you last checked
-    * it can be used to implement server TTL cleanup and verifications
-    *
-    * @return
-    */
-   boolean checkDataReceived();
-
-   /**
-    * Return the creation time of the handler
-    *
-    * @return
-    */
-   long getCreationTime();
-
-   /**
-    * To be called after you used the outputBuffer
-    *
-    * @param bytes number of bytes you used already on the output
-    */
-   void outputDone(int bytes);
-
-   /**
-    * it will return pending bytes you have on the Transport
-    * after you are done with it you must call {@link #outputDone(int)}
-    *
-    * @return
-    */
-   ByteBuf outputBuffer();
-
-   /**
-    * It will process the transport and cause events to be called
-    */
-   void flush();
-
-   /**
-    * It will close the connection and flush events
-    */
-   void close();
-
-   /**
-    * Get the object used to lock transport, connection and events operations
-    *
-    * @return
-    */
-   Object getLock();
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
deleted file mode 100644
index 45d5b67..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler.impl;
-
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-import org.proton.plug.handler.EventHandler;
-
-/**
- * This is useful for cases where you only want to implement a few methods
- */
-public abstract class DefaultEventHandler implements EventHandler {
-
-   @Override
-   public void onInit(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalOpen(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteOpen(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalClose(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteClose(Connection connection) throws Exception {
-   }
-
-   @Override
-   public void onFinal(Connection connection) throws Exception {
-
-   }
-
-   @Override
-   public void onInit(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalOpen(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteOpen(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalClose(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteClose(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onFinal(Session session) throws Exception {
-
-   }
-
-   @Override
-   public void onInit(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalOpen(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteOpen(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onLocalClose(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteClose(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onFlow(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onFinal(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onRemoteDetach(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onDetach(Link link) throws Exception {
-
-   }
-
-   @Override
-   public void onDelivery(Delivery delivery) throws Exception {
-
-   }
-
-   @Override
-   public void onTransport(Transport transport) throws Exception {
-
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
deleted file mode 100644
index b2f6406..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.handler.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.jboss.logging.Logger;
-import org.proton.plug.ClientSASL;
-import org.proton.plug.ServerSASL;
-import org.proton.plug.handler.EventHandler;
-import org.proton.plug.handler.Events;
-import org.proton.plug.handler.ProtonHandler;
-import org.proton.plug.context.ProtonInitializable;
-import org.proton.plug.SASLResult;
-import org.proton.plug.util.ByteUtil;
-
-/**
- * Clebert Suconic
- */
-public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler {
-
-   private static final Logger log = Logger.getLogger(ProtonHandlerImpl.class);
-
-   private static final byte SASL = 0x03;
-
-   private static final byte BARE = 0x00;
-
-   private final Transport transport = Proton.transport();
-
-   private final Connection connection = Proton.connection();
-
-   private final Collector collector = Proton.collector();
-
-   private final Executor dispatchExecutor;
-
-   private final Runnable dispatchRunnable = new Runnable() {
-      @Override
-      public void run() {
-         dispatch();
-      }
-   };
-
-   private ArrayList<EventHandler> handlers = new ArrayList<>();
-
-   private Sasl serverSasl;
-
-   private Sasl clientSasl;
-
-   private final Object lock = new Object();
-
-   private final long creationTime;
-
-   private Map<String, ServerSASL> saslHandlers;
-
-   private SASLResult saslResult;
-
-   protected volatile boolean dataReceived;
-
-   protected boolean receivedFirstPacket = false;
-
-   private int offset = 0;
-
-   public ProtonHandlerImpl(Executor dispatchExecutor) {
-      this.dispatchExecutor = dispatchExecutor;
-      this.creationTime = System.currentTimeMillis();
-      transport.bind(connection);
-      connection.collect(collector);
-   }
-
-   @Override
-   public long tick(boolean firstTick) {
-      if (!firstTick) {
-         try {
-            if (connection.getLocalState() != EndpointState.CLOSED) {
-               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-               if (transport.isClosed()) {
-                  throw new IllegalStateException("Channel was inactive for to long");
-               }
-               return rescheduleAt;
-            }
-         }
-         catch (Exception e) {
-            transport.close();
-            connection.setCondition(new ErrorCondition());
-         }
-         return 0;
-      }
-      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-   }
-
-   @Override
-   public int capacity() {
-      synchronized (lock) {
-         return transport.capacity();
-      }
-   }
-
-   @Override
-   public Object getLock() {
-      return lock;
-   }
-
-   @Override
-   public Transport getTransport() {
-      return transport;
-   }
-
-   @Override
-   public Connection getConnection() {
-      return connection;
-   }
-
-   @Override
-   public ProtonHandler addEventHandler(EventHandler handler) {
-      handlers.add(handler);
-      return this;
-   }
-
-   @Override
-   public void createServerSASL(ServerSASL[] handlers) {
-      this.serverSasl = transport.sasl();
-      saslHandlers = new HashMap<>();
-      String[] names = new String[handlers.length];
-      int count = 0;
-      for (ServerSASL handler : handlers) {
-         saslHandlers.put(handler.getName(), handler);
-         names[count++] = handler.getName();
-      }
-      this.serverSasl.server();
-      serverSasl.setMechanisms(names);
-
-   }
-
-   @Override
-   public SASLResult getSASLResult() {
-      return saslResult;
-   }
-
-   @Override
-   public void inputBuffer(ByteBuf buffer) {
-      dataReceived = true;
-      synchronized (lock) {
-         while (buffer.readableBytes() > 0) {
-            int capacity = transport.capacity();
-
-            if (!receivedFirstPacket) {
-               try {
-                  byte auth = buffer.getByte(4);
-                  if (auth == SASL || auth == BARE) {
-                     dispatchAuth(auth == SASL);
-                     /*
-                     * there is a chance that if SASL Handshake has been carried out that the capacity may change.
-                     * */
-                     capacity = transport.capacity();
-                  }
-               }
-               catch (Throwable e) {
-                  log.debug(e.getMessage(), e);
-               }
-
-               receivedFirstPacket = true;
-            }
-
-            if (capacity > 0) {
-               ByteBuffer tail = transport.tail();
-               int min = Math.min(capacity, buffer.readableBytes());
-               tail.limit(min);
-               buffer.readBytes(tail);
-
-               flush();
-            }
-            else {
-               if (capacity == 0) {
-                  log.debugf("abandoning: readableBytes=%d", buffer.readableBytes());
-               }
-               else {
-                  log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity());
-               }
-               break;
-            }
-         }
-      }
-   }
-
-   @Override
-   public boolean checkDataReceived() {
-      boolean res = dataReceived;
-
-      dataReceived = false;
-
-      return res;
-   }
-
-   @Override
-   public long getCreationTime() {
-      return creationTime;
-   }
-
-   @Override
-   public void outputDone(int bytes) {
-      synchronized (lock) {
-         transport.pop(bytes);
-         offset -= bytes;
-
-         if (offset < 0) {
-            throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes +
-                                               ", outcome result=" + offset);
-         }
-      }
-
-      flush();
-   }
-
-   @Override
-   public ByteBuf outputBuffer() {
-
-      synchronized (lock) {
-         int pending = transport.pending();
-
-         if (pending < 0) {
-            return null;//throw new IllegalStateException("xxx need to close the connection");
-         }
-
-         int size = pending - offset;
-
-         if (size < 0) {
-            throw new IllegalStateException("negative size: " + pending);
-         }
-
-         if (size == 0) {
-            return null;
-         }
-
-         // For returning PooledBytes
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
-         ByteBuffer head = transport.head();
-         head.position(offset);
-         head.limit(offset + size);
-         buffer.writeBytes(head);
-         offset += size; // incrementing offset for future calls
-         return buffer;
-      }
-   }
-
-   @Override
-   public void createClientSasl(ClientSASL clientSASL) {
-      if (clientSASL != null) {
-         clientSasl = transport.sasl();
-         clientSasl.setMechanisms(clientSASL.getName());
-         byte[] initialSasl = clientSASL.getBytes();
-         clientSasl.send(initialSasl, 0, initialSasl.length);
-      }
-   }
-
-   @Override
-   public void flush() {
-      synchronized (lock) {
-         transport.process();
-
-         checkServerSASL();
-
-      }
-
-      dispatchExecutor.execute(dispatchRunnable);
-   }
-
-   @Override
-   public void close() {
-      synchronized (lock) {
-         connection.close();
-      }
-      flush();
-   }
-
-   protected void checkServerSASL() {
-      if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
-         // TODO: should we look at the first only?
-         ServerSASL mechanism = saslHandlers.get(serverSasl.getRemoteMechanisms()[0]);
-         if (mechanism != null) {
-
-            byte[] dataSASL = new byte[serverSasl.pending()];
-            serverSasl.recv(dataSASL, 0, dataSASL.length);
-
-            if (log.isTraceEnabled()) {
-               log.trace("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2));
-            }
-
-            saslResult = mechanism.processSASL(dataSASL);
-
-            if (saslResult != null && saslResult.isSuccess()) {
-               serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK);
-               serverSasl = null;
-               saslHandlers.clear();
-               saslHandlers = null;
-            }
-            else {
-               serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH);
-            }
-            serverSasl = null;
-         }
-         else {
-            // no auth available, system error
-            serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS);
-         }
-      }
-   }
-
-   private Event popEvent() {
-      synchronized (lock) {
-         Event ev = collector.peek();
-         if (ev != null) {
-            // pop will invalidate the event
-            // for that reason we make a new one
-            // Events are reused inside the collector, so we need to make a new one here
-            ev = ev.copy();
-            collector.pop();
-         }
-         return ev;
-      }
-   }
-
-   private void dispatchAuth(boolean sasl) {
-      for (EventHandler h : handlers) {
-         h.onAuthInit(this, getConnection(), sasl);
-      }
-   }
-
-   private void dispatch() {
-      Event ev;
-      // We don't hold a lock on the entire event processing
-      // because we could have a distributed deadlock
-      // while processing events (for instance onTransport)
-      // while a client is also trying to write here
-      while ((ev = popEvent()) != null) {
-         for (EventHandler h : handlers) {
-            if (log.isTraceEnabled()) {
-               log.trace("Handling " + ev + " towards " + h);
-            }
-            try {
-               Events.dispatch(ev, h);
-            }
-            catch (Exception e) {
-               log.warn(e.getMessage(), e);
-               connection.setCondition(new ErrorCondition());
-            }
-         }
-      }
-
-      for (EventHandler h : handlers) {
-         try {
-            h.onTransport(transport);
-         }
-         catch (Exception e) {
-            log.warn(e.getMessage(), e);
-            connection.setCondition(new ErrorCondition());
-         }
-      }
-
-   }
-}


Mime
View raw message