activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [07/15] activemq-artemis git commit: ARTEMIS-751 Simplification of the AMQP implementation
Date Tue, 27 Sep 2016 13:54:34 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
deleted file mode 100644
index eb2a778..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-public abstract class AMQPConnectionContextFactory {
-
-   /**
-    * @return
-    */
-   public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
-                                                          String containerId,
-                                                          int idleTimeout,
-                                                          int maxFrameSize,
-                                                          int channelMax,
-                                                          Executor dispatchExecutor,
-                                                          ScheduledExecutorService scheduledPool);
-
-   /**
-    * @return
-    */
-   public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
deleted file mode 100644
index 518c79e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPServerConnectionContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-public interface AMQPServerConnectionContext extends AMQPConnectionContext {
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
deleted file mode 100644
index 5f3b6dd..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionCallback.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.context.ProtonPlugSender;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-/**
- * These are methods where the Proton Plug component will call your server
- */
-public interface AMQPSessionCallback {
-
-   void init(AMQPSessionContext session, SASLResult saslResult) throws Exception;
-
-   void start();
-
-   void onFlowConsumer(Object consumer, int credits, boolean drain);
-
-   Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception;
-
-   void startSender(Object brokerConsumer) throws Exception;
-
-   void createTemporaryQueue(String queueName) throws Exception;
-
-   void createTemporaryQueue(String address, String queueName, String filter) throws Exception;
-
-   void createDurableQueue(String address, String queueName, String filter) throws Exception;
-
-   void offerProducerCredit(String address, int credits, int threshold, Receiver receiver);
-
-   void deleteQueue(String address) throws Exception;
-
-   /**
-    * Returns true if a queue is found with matching name, if autoCreate=true and autoCreateJMSQueues is switched on then
-    * this method will auto create the queue, with name=queueName, address=queueName, filter=null.
-    *
-    * @param queueName
-    * @param autoCreate
-    * @return
-    * @throws Exception
-    */
-   QueueQueryResult queueQuery(String queueName, boolean autoCreate) throws Exception;
-
-   boolean bindingQuery(String address) throws Exception;
-
-   void closeSender(Object brokerConsumer) throws Exception;
-
-   // This one can be a lot improved
-   ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception;
-
-   String tempQueueName();
-
-
-   Transaction getTransaction(Binary txid) throws ActiveMQAMQPException;
-
-   Binary newTransaction();
-
-   void commitTX(Binary txid) throws Exception;
-
-   void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception;
-
-   void close() throws Exception;
-
-   void ack(Transaction transaction, Object brokerConsumer, Object message) throws Exception;
-
-   /**
-    * @param brokerConsumer
-    * @param message
-    * @param updateCounts   this identified if the cancel was because of a failure or just cleaning up the
-    *                       client's cache.
-    *                       in some implementations you could call this failed
-    */
-   void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception;
-
-   void resumeDelivery(Object consumer);
-
-   /**
-    * @param delivery
-    * @param address
-    * @param messageFormat
-    * @param messageEncoded a Heap Buffer ByteBuffer (safe to convert into byte[])
-    */
-   void serverSend(Transaction transaction,
-                   Receiver receiver,
-                   Delivery delivery,
-                   String address,
-                   int messageFormat,
-                   ByteBuf messageEncoded) throws Exception;
-
-   String getPubSubPrefix();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
deleted file mode 100644
index 66e9c5a..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPSessionContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-public interface AMQPSessionContext {
-
-   byte[] getTag();
-
-   void replaceTag(byte[] tag);
-
-   void close();
-
-   void removeSender(Sender sender) throws ActiveMQAMQPException;
-
-   void removeReceiver(Receiver receiver);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
deleted file mode 100644
index 4ddbbcc..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-import org.apache.qpid.proton.amqp.DescribedType;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-import java.util.AbstractMap;
-import java.util.Map;
-
-/**
- * Set of useful methods and definitions used in the AMQP protocol handling
- */
-public class AmqpSupport {
-
-   // Identification values used to locating JMS selector types.
-   public static final UnsignedLong JMS_SELECTOR_CODE = UnsignedLong.valueOf(0x0000468C00000004L);
-   public static final Symbol JMS_SELECTOR_NAME = Symbol.valueOf("apache.org:selector-filter:string");
-   public static final Object[] JMS_SELECTOR_FILTER_IDS = new Object[]{JMS_SELECTOR_CODE, JMS_SELECTOR_NAME};
-   public static final UnsignedLong NO_LOCAL_CODE = UnsignedLong.valueOf(0x0000468C00000003L);
-   public static final Symbol NO_LOCAL_NAME = Symbol.valueOf("apache.org:no-local-filter:list");
-   public static final Object[] NO_LOCAL_FILTER_IDS = new Object[]{NO_LOCAL_CODE, NO_LOCAL_NAME};
-
-   // Capabilities used to identify destination type in some requests.
-   public static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
-   public static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
-
-   // Symbols used to announce connection information to remote peer.
-   public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
-   public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
-
-   // Symbols used to announce connection information to remote peer.
-   public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
-   public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
-   public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
-   public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
-   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
-   public static final Symbol PRODUCT = Symbol.valueOf("product");
-   public static final Symbol VERSION = Symbol.valueOf("version");
-   public static final Symbol PLATFORM = Symbol.valueOf("platform");
-
-   // Symbols used in configuration of newly opened links.
-   public static final Symbol COPY = Symbol.getSymbol("copy");
-
-   // Lifetime policy symbols
-   public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
-
-   public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
-   /**
-    * Search for a given Symbol in a given array of Symbol object.
-    *
-    * @param symbols
-    *        the set of Symbols to search.
-    * @param key
-    *        the value to try and find in the Symbol array.
-    *
-    * @return true if the key is found in the given Symbol array.
-    */
-   public static boolean contains(Symbol[] symbols, Symbol key) {
-      if (symbols == null || symbols.length == 0) {
-         return false;
-      }
-
-      for (Symbol symbol : symbols) {
-         if (symbol.equals(key)) {
-            return true;
-         }
-      }
-
-      return false;
-   }
-
-   /**
-    * Search for a particular filter using a set of known indentification values
-    * in the Map of filters.
-    *
-    * @param filters
-    *        The filters map that should be searched.
-    * @param filterIds
-    *        The aliases for the target filter to be located.
-    *
-    * @return the filter if found in the mapping or null if not found.
-    */
-   public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) {
-
-      if (filterIds == null || filterIds.length == 0) {
-         StringBuilder ids = new StringBuilder();
-         if (filterIds != null) {
-            for (Object filterId : filterIds) {
-               ids.append(filterId).append(" ");
-            }
-         }
-         throw new IllegalArgumentException("Invalid Filter Ids array passed: " + ids);
-      }
-
-      if (filters == null || filters.isEmpty()) {
-         return null;
-      }
-
-      for (Map.Entry<Symbol, Object> filter : filters.entrySet()) {
-         if (filter.getValue() instanceof DescribedType) {
-            DescribedType describedType = ((DescribedType) filter.getValue());
-            Object descriptor = describedType.getDescriptor();
-
-            for (Object filterId : filterIds) {
-               if (descriptor.equals(filterId)) {
-                  return new AbstractMap.SimpleImmutableEntry<>(filter.getKey(), describedType);
-               }
-            }
-         }
-      }
-
-      return null;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ClientSASL.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
deleted file mode 100644
index c36fd19..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ClientSASL.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-public interface ClientSASL {
-
-   byte[] getBytes();
-
-   String getName();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/SASLResult.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/SASLResult.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/SASLResult.java
deleted file mode 100644
index f7ff671..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/SASLResult.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-public interface SASLResult {
-
-   String getUser();
-
-   boolean isSuccess();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ServerSASL.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
deleted file mode 100644
index ed1c361..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/ServerSASL.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug;
-
-public interface ServerSASL {
-
-   String getName();
-
-   SASLResult processSASL(byte[] bytes);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java
deleted file mode 100644
index 6287c06..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AMQPConstants.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-/**
- * Constants derived from the AMQP spec
- */
-public class AMQPConstants {
-
-   /*
-   * Connection Properties
-   * http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf#subsection.2.7.1
-   * */
-   public static class Connection {
-
-      public static final int DEFAULT_IDLE_TIMEOUT = -1;
-
-      public static final int DEFAULT_MAX_FRAME_SIZE = -1;//it should be according to the spec 4294967295l;
-
-      public static final int DEFAULT_CHANNEL_MAX = 65535;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
deleted file mode 100644
index 120a37b..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import static org.proton.plug.AmqpSupport.PRODUCT;
-import static org.proton.plug.AmqpSupport.VERSION;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.activemq.artemis.utils.VersionLoader;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.SASLResult;
-import org.proton.plug.context.server.ProtonServerSenderContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.handler.ProtonHandler;
-import org.proton.plug.handler.impl.DefaultEventHandler;
-import org.proton.plug.util.ByteUtil;
-
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
-import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
-
-public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext {
-   private static final Logger log = Logger.getLogger(AbstractConnectionContext.class);
-
-   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
-   public static final String AMQP_CONTAINER_ID = "amqp-container-id";
-
-   protected final ProtonHandler handler;
-
-   protected AMQPConnectionCallback connectionCallback;
-   private final String containerId;
-   private final Map<Symbol, Object> connectionProperties = new HashMap<>();
-   private final ScheduledExecutorService scheduledPool;
-
-   private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
-
-   protected LocalListener listener = new LocalListener();
-
-   public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
-      this(connectionCallback, null, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX, dispatchExecutor, scheduledPool);
-   }
-
-   public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
-                                    String containerId,
-                                    int idleTimeout,
-                                    int maxFrameSize,
-                                    int channelMax,
-                                    Executor dispatchExecutor,
-                                    ScheduledExecutorService scheduledPool) {
-      this.connectionCallback = connectionCallback;
-      this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
-
-      connectionProperties.put(PRODUCT, "apache-activemq-artemis");
-      connectionProperties.put(VERSION, VersionLoader.getVersion().getFullVersion());
-
-      this.scheduledPool = scheduledPool;
-      connectionCallback.setConnection(this);
-      this.handler = ProtonHandler.Factory.create(dispatchExecutor);
-      Transport transport = handler.getTransport();
-      transport.setEmitFlowEventOnSend(false);
-      if (idleTimeout > 0) {
-         transport.setIdleTimeout(idleTimeout);
-      }
-      transport.setChannelMax(channelMax);
-      transport.setMaxFrameSize(maxFrameSize);
-      handler.addEventHandler(listener);
-   }
-
-   @Override
-   public SASLResult getSASLResult() {
-      return handler.getSASLResult();
-   }
-
-   @Override
-   public void inputBuffer(ByteBuf buffer) {
-      if (log.isTraceEnabled()) {
-         ByteUtil.debugFrame(log, "Buffer Received ", buffer);
-      }
-
-      handler.inputBuffer(buffer);
-   }
-
-   public void destroy() {
-      connectionCallback.close();
-   }
-
-   /**
-    * See comment at {@link org.proton.plug.AMQPConnectionContext#isSyncOnFlush()}
-    */
-   @Override
-   public boolean isSyncOnFlush() {
-      return false;
-   }
-
-   @Override
-   public Object getLock() {
-      return handler.getLock();
-   }
-
-   @Override
-   public int capacity() {
-      return handler.capacity();
-   }
-
-   @Override
-   public void outputDone(int bytes) {
-      handler.outputDone(bytes);
-   }
-
-   @Override
-   public void flush() {
-      handler.flush();
-   }
-
-   @Override
-   public void close() {
-      handler.close();
-   }
-
-   protected AbstractProtonSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException {
-      AbstractProtonSessionContext sessionExtension = sessions.get(realSession);
-      if (sessionExtension == null) {
-         // how this is possible? Log a warn here
-         sessionExtension = newSessionExtension(realSession);
-         realSession.setContext(sessionExtension);
-         sessions.put(realSession, sessionExtension);
-      }
-      return sessionExtension;
-   }
-
-   protected abstract void remoteLinkOpened(Link link) throws Exception;
-
-   protected abstract AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException;
-
-   @Override
-   public boolean checkDataReceived() {
-      return handler.checkDataReceived();
-   }
-
-   @Override
-   public long getCreationTime() {
-      return handler.getCreationTime();
-   }
-
-   protected void flushBytes() {
-      ByteBuf bytes;
-      // handler.outputBuffer has the lock
-      while ((bytes = handler.outputBuffer()) != null) {
-         connectionCallback.onTransport(bytes, AbstractConnectionContext.this);
-      }
-   }
-
-   public String getRemoteContainer() {
-      return handler.getConnection().getRemoteContainer();
-   }
-
-   public String getPubSubPrefix() {
-      return null;
-   }
-
-   protected boolean validateConnection(Connection connection) {
-      return true;
-   }
-
-   protected void initInternal() throws Exception {
-   }
-
-   // This listener will perform a bunch of things here
-   class LocalListener extends DefaultEventHandler {
-
-      @Override
-      public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
-         if (sasl) {
-            handler.createServerSASL(connectionCallback.getSASLMechnisms());
-         }
-         else {
-            if (!connectionCallback.isSupportsAnonymous()) {
-               connectionCallback.sendSASLSupported();
-               connectionCallback.close();
-               handler.close();
-            }
-         }
-      }
-
-      @Override
-      public void onTransport(Transport transport) {
-         flushBytes();
-      }
-
-      @Override
-      public void onRemoteOpen(Connection connection) throws Exception {
-         synchronized (getLock()) {
-            try {
-               initInternal();
-            }
-            catch (Exception e) {
-               log.error("Error init connection", e);
-            }
-            if (!validateConnection(connection)) {
-               connection.close();
-            }
-            else {
-               connection.setContext(AbstractConnectionContext.this);
-               connection.setContainer(containerId);
-               connection.setProperties(connectionProperties);
-               connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-               connection.open();
-            }
-         }
-         initialise();
-
-         /*
-         * This can be null which is in effect an empty map, also we really dont need to check this for in bound connections
-         * but its here in case we add support for outbound connections.
-         * */
-         if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
-            long nextKeepAliveTime = handler.tick(true);
-            flushBytes();
-            if (nextKeepAliveTime > 0 && scheduledPool != null) {
-               scheduledPool.schedule(new Runnable() {
-                  @Override
-                  public void run() {
-                     long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
-                     flushBytes();
-                     if (rescheduleAt > 0) {
-                        scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
-                     }
-                  }
-               }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
-            }
-         }
-      }
-
-      @Override
-      public void onRemoteClose(Connection connection) {
-         synchronized (getLock()) {
-            connection.close();
-            for (AbstractProtonSessionContext protonSession : sessions.values()) {
-               protonSession.close();
-            }
-            sessions.clear();
-         }
-         // We must force write the channel before we actually destroy the connection
-         onTransport(handler.getTransport());
-         destroy();
-      }
-
-      @Override
-      public void onLocalOpen(Session session) throws Exception {
-         getSessionExtension(session);
-      }
-
-      @Override
-      public void onRemoteOpen(Session session) throws Exception {
-         getSessionExtension(session).initialise();
-         synchronized (getLock()) {
-            session.open();
-         }
-      }
-
-      @Override
-      public void onLocalClose(Session session) throws Exception {
-      }
-
-      @Override
-      public void onRemoteClose(Session session) throws Exception {
-         synchronized (getLock()) {
-            session.close();
-         }
-
-         AbstractProtonSessionContext sessionContext = (AbstractProtonSessionContext) session.getContext();
-         if (sessionContext != null) {
-            sessionContext.close();
-            sessions.remove(session);
-            session.setContext(null);
-         }
-      }
-
-      @Override
-      public void onRemoteOpen(Link link) throws Exception {
-         remoteLinkOpened(link);
-      }
-
-      @Override
-      public void onFlow(Link link) throws Exception {
-         ((ProtonDeliveryHandler) link.getContext()).onFlow(link.getCredit(), link.getDrain());
-      }
-
-      @Override
-      public void onRemoteClose(Link link) throws Exception {
-         link.close();
-         ProtonDeliveryHandler linkContext = (ProtonDeliveryHandler) link.getContext();
-         if (linkContext != null) {
-            linkContext.close(true);
-         }
-      }
-
-      @Override
-      public void onRemoteDetach(Link link) throws Exception {
-         link.detach();
-      }
-
-      @Override
-      public void onDetach(Link link) throws Exception {
-         Object context = link.getContext();
-         if (context instanceof ProtonServerSenderContext) {
-            ProtonServerSenderContext senderContext = (ProtonServerSenderContext) context;
-            senderContext.close(false);
-         }
-      }
-
-      @Override
-      public void onDelivery(Delivery delivery) throws Exception {
-         ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext();
-         if (handler != null) {
-            handler.onMessage(delivery);
-         }
-         else {
-            // TODO: logs
-
-            System.err.println("Handler is null, can't delivery " + delivery);
-         }
-      }
-   }
-
-   @Override
-   public Symbol[] getConnectionCapabilitiesOffered() {
-      return null;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
deleted file mode 100644
index 29e3459..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonContextSender.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.util.CreditsSemaphore;
-import org.proton.plug.util.NettyWritable;
-
-/**
- * A this is a wrapper around an ActiveMQ Artemis ServerConsumer for handling outgoing messages and incoming acks via a Proton Sender
- */
-public abstract class AbstractProtonContextSender extends ProtonInitializable implements ProtonDeliveryHandler {
-
-   protected final AbstractProtonSessionContext protonSession;
-   protected final Sender sender;
-   protected final AbstractConnectionContext connection;
-   protected boolean closed = false;
-   protected final AMQPSessionCallback sessionSPI;
-   protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
-
-   public AbstractProtonContextSender(AbstractConnectionContext connection,
-                                      Sender sender,
-                                      AbstractProtonSessionContext protonSession,
-                                      AMQPSessionCallback server) {
-      this.connection = connection;
-      this.sender = sender;
-      this.protonSession = protonSession;
-      this.sessionSPI = server;
-   }
-
-   @Override
-   public void onFlow(int credits, boolean drain) {
-      this.creditsSemaphore.setCredits(credits);
-   }
-
-   /*
-   * start the session
-   * */
-   public void start() throws ActiveMQAMQPException {
-      sessionSPI.start();
-      // protonSession.getServerSession().start();
-   }
-
-   /*
-   * close the session
-   * */
-   @Override
-   public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
-      closed = true;
-      protonSession.removeSender(sender);
-      synchronized (connection.getLock()) {
-         sender.close();
-      }
-
-      connection.flush();
-   }
-
-   /*
-   * close the session
-   * */
-   @Override
-   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      closed = true;
-      sender.setCondition(condition);
-      close(false);
-   }
-
-   @Override
-   /*
-   * handle an incoming Ack from Proton, basically pass to ActiveMQ Artemis to handle
-   * */ public abstract void onMessage(Delivery delivery) throws ActiveMQAMQPException;
-
-   /*
-   * check the state of the consumer, i.e. are there any more messages. only really needed for browsers?
-   * */
-   public void checkState() {
-   }
-
-   public Sender getSender() {
-      return sender;
-   }
-
-   protected int performSend(ProtonJMessage serverMessage, Object context) {
-      if (!creditsSemaphore.tryAcquire()) {
-         try {
-            creditsSemaphore.acquire();
-         }
-         catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            // nothing to be done here.. we just keep going
-            throw new IllegalStateException(e.getMessage(), e);
-         }
-      }
-
-      //presettle means we can ack the message on the dealer side before we send it, i.e. for browsers
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-
-      //we only need a tag if we are going to ack later
-      byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
-
-      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-      try {
-         serverMessage.encode(new NettyWritable(nettyBuffer));
-
-         int size = nettyBuffer.writerIndex();
-
-         synchronized (connection.getLock()) {
-            final Delivery delivery;
-            delivery = sender.delivery(tag, 0, tag.length);
-            delivery.setContext(context);
-
-            // this will avoid a copy.. patch provided by Norman using buffer.array()
-            sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
-
-            if (preSettle) {
-               delivery.settle();
-            }
-            else {
-               sender.advance();
-            }
-         }
-
-         connection.flush();
-
-         return size;
-      }
-      finally {
-         nettyBuffer.release();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
deleted file mode 100644
index c210950..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Receiver;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-/**
- * handles incoming messages via a Proton Receiver and forwards them to ActiveMQ
- */
-public abstract class AbstractProtonReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler {
-
-   protected final AbstractConnectionContext connection;
-
-   protected final AbstractProtonSessionContext protonSession;
-
-   protected final Receiver receiver;
-
-   protected String address;
-
-   protected final AMQPSessionCallback sessionSPI;
-
-   public AbstractProtonReceiverContext(AMQPSessionCallback sessionSPI,
-                                        AbstractConnectionContext connection,
-                                        AbstractProtonSessionContext protonSession,
-                                        Receiver receiver) {
-      this.connection = connection;
-      this.protonSession = protonSession;
-      this.receiver = receiver;
-      this.sessionSPI = sessionSPI;
-   }
-
-   @Override
-   public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
-      protonSession.removeReceiver(receiver);
-   }
-
-   @Override
-   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      receiver.setCondition(condition);
-      close(false);
-   }
-
-   public void flow(int credits, int threshold) {
-      // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
-      if (sessionSPI != null) {
-         sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
-      }
-      else {
-         synchronized (connection.getLock()) {
-            receiver.flow(credits);
-            connection.flush();
-         }
-      }
-
-   }
-
-   public void drain(int credits) {
-      synchronized (connection.getLock()) {
-         receiver.drain(credits);
-      }
-      connection.flush();
-   }
-
-   public int drained() {
-      return receiver.drained();
-   }
-
-   public boolean isDraining() {
-      return receiver.draining();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
deleted file mode 100644
index 5c0a626..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonSessionContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AMQPSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPInternalErrorException;
-
-/**
- * ProtonSession is a direct representation of the session on the broker.
- * It has a link between a ProtonSession and a Broker or Client Session
- * The Broker Session is linked through the ProtonSessionSPI
- */
-public abstract class AbstractProtonSessionContext extends ProtonInitializable implements AMQPSessionContext {
-
-   private static final Logger log = Logger.getLogger(AbstractProtonSessionContext.class);
-   protected final AbstractConnectionContext connection;
-
-   protected final AMQPSessionCallback sessionSPI;
-
-   protected final Session session;
-
-   private long currentTag = 0;
-
-   protected Map<Receiver, AbstractProtonReceiverContext> receivers = new HashMap<>();
-
-   protected Map<Sender, AbstractProtonContextSender> senders = new HashMap<>();
-
-   protected boolean closed = false;
-
-   public AbstractProtonSessionContext(AMQPSessionCallback sessionSPI,
-                                       AbstractConnectionContext connection,
-                                       Session session) {
-      this.connection = connection;
-      this.sessionSPI = sessionSPI;
-      this.session = session;
-   }
-
-   @Override
-   public void initialise() throws Exception {
-      if (!isInitialized()) {
-         super.initialise();
-
-         if (sessionSPI != null) {
-            try {
-               sessionSPI.init(this, connection.getSASLResult());
-            }
-            catch (Exception e) {
-               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
-            }
-         }
-      }
-   }
-
-   /**
-    * TODO: maybe it needs to go?
-    *
-    * @param consumer
-    * @param queueName
-    */
-   public void disconnect(Object consumer, String queueName) {
-      AbstractProtonContextSender protonConsumer = senders.remove(consumer);
-      if (protonConsumer != null) {
-         try {
-            protonConsumer.close(false);
-         }
-         catch (ActiveMQAMQPException e) {
-            protonConsumer.getSender().setTarget(null);
-            protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage()));
-         }
-      }
-   }
-
-   @Override
-   public byte[] getTag() {
-      return Long.toHexString(currentTag++).getBytes();
-   }
-
-   @Override
-   public void replaceTag(byte[] tag) {
-      // TODO: do we need to reuse this?
-   }
-
-   @Override
-   public void close() {
-      if (closed) {
-         return;
-      }
-
-      // Making a copy to avoid ConcurrentModificationException during the iteration
-      Set<AbstractProtonReceiverContext> receiversCopy = new HashSet<>();
-      receiversCopy.addAll(receivers.values());
-
-      for (AbstractProtonReceiverContext protonProducer : receiversCopy) {
-         try {
-            protonProducer.close(false);
-         }
-         catch (Exception e) {
-            log.warn(e.getMessage(), e);
-         }
-      }
-      receivers.clear();
-
-      Set<AbstractProtonContextSender> protonSendersClone = new HashSet<>();
-      protonSendersClone.addAll(senders.values());
-
-      for (AbstractProtonContextSender protonConsumer : protonSendersClone) {
-         try {
-            protonConsumer.close(false);
-         }
-         catch (Exception e) {
-            log.warn(e.getMessage(), e);
-         }
-      }
-      senders.clear();
-      try {
-         if (sessionSPI != null) {
-            sessionSPI.close();
-         }
-      }
-      catch (Exception e) {
-         log.warn(e.getMessage(), e);
-      }
-      closed = true;
-   }
-
-   @Override
-   public void removeSender(Sender sender) throws ActiveMQAMQPException {
-      senders.remove(sender);
-   }
-
-   @Override
-   public void removeReceiver(Receiver receiver) {
-      receivers.remove(receiver);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
deleted file mode 100644
index d861394..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonDeliveryHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-/**
- * An interface to handle deliveries, either messages, acks or transaction calls
- */
-public interface ProtonDeliveryHandler {
-
-   void onFlow(int currentCredits, boolean drain);
-
-   void onMessage(Delivery delivery) throws ActiveMQAMQPException;
-
-   /*
-   * we have to distinguish between a remote close on the link and a close via a connection or session as the latter mean
-   * that a link reattach can happen and we need to keep the underlying resource (queue/subscription) around for pub subs
-   * */
-   void close(boolean remoteLinkClose) throws ActiveMQAMQPException;
-
-   void close(ErrorCondition condition) throws ActiveMQAMQPException;
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
deleted file mode 100644
index 266e8b2..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import java.util.concurrent.TimeUnit;
-
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.exceptions.ActiveMQAMQPIllegalStateException;
-import org.proton.plug.exceptions.ActiveMQAMQPTimeoutException;
-import org.proton.plug.util.FutureRunnable;
-
-public class ProtonInitializable {
-
-   private Runnable afterInit;
-
-   private boolean initialized = false;
-
-   public void afterInit(Runnable afterInit) {
-      this.afterInit = afterInit;
-   }
-
-   public boolean isInitialized() {
-      return initialized;
-   }
-
-   public void initialise() throws Exception {
-      if (!initialized) {
-         initialized = true;
-         try {
-            if (afterInit != null) {
-               afterInit.run();
-            }
-         }
-         finally {
-            afterInit = null;
-         }
-      }
-   }
-
-   public void waitWithTimeout(FutureRunnable latch) throws ActiveMQAMQPException {
-      try {
-         // TODO Configure this
-         if (!latch.await(30, TimeUnit.SECONDS)) {
-            throw new ActiveMQAMQPTimeoutException("Timed out waiting for response");
-         }
-      }
-      catch (InterruptedException e) {
-         Thread.currentThread().interrupt();
-         throw new ActiveMQAMQPIllegalStateException(e.getMessage());
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
deleted file mode 100644
index 40232ec..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonPlugSender.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import org.apache.qpid.proton.engine.Sender;
-
-public interface ProtonPlugSender {
-
-   int deliverMessage(Object message, int deliveryCount) throws Exception;
-
-   Sender getSender();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
deleted file mode 100644
index 263d3e6..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transaction.Declare;
-import org.apache.qpid.proton.amqp.transaction.Declared;
-import org.apache.qpid.proton.amqp.transaction.Discharge;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.jboss.logging.Logger;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
-
-import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl;
-import static org.proton.plug.util.DeliveryUtil.readDelivery;
-
-/**
- * handles an amqp Coordinator to deal with transaction boundaries etc
- */
-public class ProtonTransactionHandler implements ProtonDeliveryHandler {
-
-   private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
-
-   final AMQPSessionCallback sessionSPI;
-
-   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
-      this.sessionSPI = sessionSPI;
-   }
-
-   @Override
-   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
-      final Receiver receiver;
-      try {
-         receiver = ((Receiver) delivery.getLink());
-
-         if (!delivery.isReadable()) {
-            return;
-         }
-
-         readDelivery(receiver, buffer);
-
-         receiver.advance();
-
-         MessageImpl msg = decodeMessageImpl(buffer);
-
-         Object action = ((AmqpValue) msg.getBody()).getValue();
-
-         if (action instanceof Declare) {
-            Binary txID = sessionSPI.newTransaction();
-            Declared declared = new Declared();
-            declared.setTxnId(txID);
-            delivery.disposition(declared);
-            delivery.settle();
-         }
-         else if (action instanceof Discharge) {
-            Discharge discharge = (Discharge) action;
-
-            Binary txID = discharge.getTxnId();
-            if (discharge.getFail()) {
-               try {
-                  sessionSPI.rollbackTX(txID, true);
-                  delivery.disposition(new Accepted());
-               }
-               catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
-               }
-            }
-            else {
-               try {
-                  sessionSPI.commitTX(txID);
-                  delivery.disposition(new Accepted());
-               }
-               catch (ActiveMQAMQPException amqpE) {
-                  throw amqpE;
-               }
-               catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
-               }
-            }
-         }
-      }
-      catch (ActiveMQAMQPException amqpE) {
-         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
-      }
-      catch (Exception e) {
-         log.warn(e.getMessage(), e);
-         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
-      }
-      finally {
-         delivery.settle();
-         buffer.release();
-      }
-   }
-
-   private Rejected createRejected(Symbol amqpError, String message) {
-      Rejected rejected = new Rejected();
-      ErrorCondition condition = new ErrorCondition();
-      condition.setCondition(amqpError);
-      condition.setDescription(message);
-      rejected.setError(condition);
-      return rejected;
-   }
-
-   @Override
-   public void onFlow(int credits, boolean drain) {
-   }
-
-   @Override
-   public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
-      // no op
-   }
-
-   @Override
-   public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      // no op
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
deleted file mode 100644
index 49d42f9..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.client;
-
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Session;
-import org.proton.plug.AMQPClientConnectionContext;
-import org.proton.plug.AMQPClientSessionContext;
-import org.proton.plug.ClientSASL;
-import org.proton.plug.AMQPConnectionCallback;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.context.ProtonInitializable;
-import org.proton.plug.util.FutureRunnable;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext {
-
-   public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
-      super(connectionCallback, dispatchExecutor, scheduledPool);
-   }
-
-   public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback,
-                                        String containerId,
-                                        int idleTimeout,
-                                        int maxFrameSize,
-                                        int channelMax,
-                                        Executor dispatchExecutor,
-                                        ScheduledExecutorService scheduledPool) {
-      super(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
-   }
-
-   // Maybe a client interface?
-   @Override
-   public void clientOpen(ClientSASL sasl) throws Exception {
-      FutureRunnable future = new FutureRunnable(1);
-      synchronized (handler.getLock()) {
-         this.afterInit(future);
-         if (sasl != null) {
-            handler.createClientSasl(sasl);
-         }
-         handler.getConnection().open();
-      }
-
-      flush();
-
-      waitWithTimeout(future);
-   }
-
-   @Override
-   public AMQPClientSessionContext createClientSession() throws ActiveMQAMQPException {
-
-      FutureRunnable futureRunnable = new FutureRunnable(1);
-      ProtonClientSessionContext sessionImpl;
-      synchronized (handler.getLock()) {
-         Session session = handler.getConnection().session();
-         sessionImpl = (ProtonClientSessionContext) getSessionExtension(session);
-         sessionImpl.afterInit(futureRunnable);
-         session.open();
-      }
-
-      flush();
-      waitWithTimeout(futureRunnable);
-
-      return sessionImpl;
-   }
-
-   @Override
-   public void setContainer(String containerID) {
-      handler.getConnection().setContainer(containerID);
-   }
-
-   @Override
-   protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
-      AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this);
-      AbstractProtonSessionContext protonSession = new ProtonClientSessionContext(sessionSPI, this, realSession);
-
-      return protonSession;
-
-   }
-
-   @Override
-   protected void remoteLinkOpened(Link link) throws Exception {
-      Object context = link.getContext();
-      if (context != null && context instanceof ProtonInitializable) {
-         ((ProtonInitializable) context).initialise();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
deleted file mode 100644
index c0c0716..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.client;
-
-import org.proton.plug.AMQPConnectionContext;
-import org.proton.plug.AMQPConnectionContextFactory;
-import org.proton.plug.AMQPConnectionCallback;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory {
-
-   private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory();
-
-   public static AMQPConnectionContextFactory getFactory() {
-      return theInstance;
-   }
-
-   @Override
-   public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,  Executor dispatchExecutor, ScheduledExecutorService scheduledPool) {
-      return new ProtonClientConnectionContext(connectionCallback, dispatchExecutor, scheduledPool);
-   }
-
-
-   @Override
-   public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
-                                                 String containerId,
-                                                 int idleTimeout,
-                                                 int maxFrameSize,
-                                                 int channelMax,
-                                                 Executor dispatchExecutor,
-                                                 ScheduledExecutorService scheduledPool) {
-      return new ProtonClientConnectionContext(connectionCallback, containerId, idleTimeout, maxFrameSize, channelMax, dispatchExecutor, scheduledPool);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
deleted file mode 100644
index f442b9e..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.client;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.proton.plug.AMQPClientSenderContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonContextSender;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.util.FutureRunnable;
-
-public class ProtonClientContext extends AbstractProtonContextSender implements AMQPClientSenderContext {
-
-   FutureRunnable catchUpRunnable = new FutureRunnable();
-
-   public ProtonClientContext(AbstractConnectionContext connection,
-                              Sender sender,
-                              AbstractProtonSessionContext protonSession,
-                              AMQPSessionCallback server) {
-      super(connection, sender, protonSession, server);
-   }
-
-   @Override
-   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      if (delivery.getRemoteState() instanceof Accepted) {
-         if (delivery.getContext() instanceof FutureRunnable) {
-            ((FutureRunnable) delivery.getContext()).countDown();
-         }
-      }
-   }
-
-   @Override
-   public void send(ProtonJMessage message) {
-      if (sender.getSenderSettleMode() != SenderSettleMode.SETTLED) {
-         catchUpRunnable.countUp();
-      }
-      performSend(message, catchUpRunnable);
-   }
-
-   public boolean sync(long timeout, TimeUnit unit) {
-      try {
-         return catchUpRunnable.await(timeout, unit);
-      }
-      catch (InterruptedException e) {
-         Thread.currentThread().interrupt();
-         return false;
-      }
-   }
-
-   @Override
-   public String getAddress() {
-      return sender.getRemoteTarget().getAddress();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
deleted file mode 100644
index c06ae58..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.client;
-
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.message.ProtonJMessage;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.proton.plug.AMQPClientReceiverContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonReceiverContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-
-import static org.proton.plug.util.DeliveryUtil.readDelivery;
-import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl;
-
-public class ProtonClientReceiverContext extends AbstractProtonReceiverContext implements AMQPClientReceiverContext {
-
-   public ProtonClientReceiverContext(AMQPSessionCallback sessionSPI,
-                                      AbstractConnectionContext connection,
-                                      AbstractProtonSessionContext protonSession,
-                                      Receiver receiver) {
-      super(sessionSPI, connection, protonSession, receiver);
-   }
-
-   @Override
-   public void onFlow(int credits, boolean drain) {
-   }
-
-   LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>();
-
-   /*
-   * called when Proton receives a message to be delivered via a Delivery.
-   *
-   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
-   *
-   * */
-   @Override
-   public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-      try {
-         synchronized (connection.getLock()) {
-            readDelivery(receiver, buffer);
-            MessageImpl clientMessage = decodeMessageImpl(buffer);
-
-            // This second method could be better
-            //            clientMessage.decode(buffer.nioBuffer());
-
-            receiver.advance();
-            delivery.disposition(Accepted.getInstance());
-            queues.add(clientMessage);
-
-         }
-      }
-      finally {
-         buffer.release();
-      }
-   }
-
-   @Override
-   public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception {
-      return queues.poll(time, unit);
-   }
-
-   @Override
-   public void flow(int credits) {
-      flow(credits, Integer.MAX_VALUE);
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
deleted file mode 100644
index 9079dc3..0000000
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.proton.plug.context.client;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.Target;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.engine.Receiver;
-import org.apache.qpid.proton.engine.Sender;
-import org.apache.qpid.proton.engine.Session;
-import org.proton.plug.AMQPClientReceiverContext;
-import org.proton.plug.AMQPClientSenderContext;
-import org.proton.plug.AMQPClientSessionContext;
-import org.proton.plug.AMQPSessionCallback;
-import org.proton.plug.AmqpSupport;
-import org.proton.plug.context.AbstractConnectionContext;
-import org.proton.plug.context.AbstractProtonSessionContext;
-import org.proton.plug.exceptions.ActiveMQAMQPException;
-import org.proton.plug.util.FutureRunnable;
-
-public class ProtonClientSessionContext extends AbstractProtonSessionContext implements AMQPClientSessionContext {
-
-   public ProtonClientSessionContext(AMQPSessionCallback sessionSPI,
-                                     AbstractConnectionContext connection,
-                                     Session session) {
-      super(sessionSPI, connection, session);
-   }
-
-   @Override
-   public AMQPClientSenderContext createSender(String address, boolean preSettled) throws ActiveMQAMQPException {
-      FutureRunnable futureRunnable = new FutureRunnable(1);
-
-      ProtonClientContext amqpSender;
-      synchronized (connection.getLock()) {
-         Sender sender = session.sender(address);
-         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
-         Target target = new Target();
-         target.setAddress(address);
-         sender.setTarget(target);
-         amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
-         amqpSender.afterInit(futureRunnable);
-         sender.setContext(amqpSender);
-         sender.open();
-      }
-
-      connection.flush();
-
-      waitWithTimeout(futureRunnable);
-      return amqpSender;
-   }
-
-   @Override
-   public AMQPClientSenderContext createDynamicSender(boolean preSettled) throws ActiveMQAMQPException {
-      FutureRunnable futureRunnable = new FutureRunnable(1);
-
-      ProtonClientContext amqpSender;
-      synchronized (connection.getLock()) {
-         final String senderName = "Dynamic-" + UUID.randomUUID().toString();
-
-         Sender sender = session.sender(senderName);
-         sender.setSenderSettleMode(SenderSettleMode.SETTLED);
-
-         Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
-         Source source = new Source();
-         source.setAddress(senderName);
-         source.setOutcomes(outcomes);
-
-         Target target = new Target();
-         target.setDynamic(true);
-         target.setDurable(TerminusDurability.NONE);
-         target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-
-         // Set the dynamic node lifetime-policy
-         Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
-         dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
-         target.setDynamicNodeProperties(dynamicNodeProperties);
-
-         amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI);
-         amqpSender.afterInit(futureRunnable);
-         sender.setSource(source);
-         sender.setTarget(target);
-         sender.setContext(amqpSender);
-         sender.open();
-      }
-
-      connection.flush();
-
-      waitWithTimeout(futureRunnable);
-      return amqpSender;
-   }
-
-   @Override
-   public AMQPClientReceiverContext createReceiver(String address) throws ActiveMQAMQPException {
-      return createReceiver(address, address);
-   }
-
-   @Override
-   public AMQPClientReceiverContext createReceiver(String name, String address) throws ActiveMQAMQPException {
-      FutureRunnable futureRunnable = new FutureRunnable(1);
-
-      ProtonClientReceiverContext amqpReceiver;
-
-      synchronized (connection.getLock()) {
-         Receiver receiver = session.receiver(name);
-         Source source = new Source();
-         source.setAddress(address);
-         receiver.setSource(source);
-         amqpReceiver = new ProtonClientReceiverContext(sessionSPI, connection, this, receiver);
-         receiver.setContext(amqpReceiver);
-         amqpReceiver.afterInit(futureRunnable);
-         receiver.open();
-      }
-
-      connection.flush();
-
-      waitWithTimeout(futureRunnable);
-
-      return amqpReceiver;
-
-   }
-}


Mime
View raw message