activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-728 Broker doesn't support unique jms client-id (qpid-jms client)
Date Thu, 15 Sep 2016 15:17:10 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 1f392da88 -> 646a89198


ARTEMIS-728 Broker doesn't support unique jms client-id (qpid-jms client)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/406d09d9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/406d09d9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/406d09d9

Branch: refs/heads/master
Commit: 406d09d986f19fd8f5f0aa2fb0973fa425106dcc
Parents: 1f392da
Author: Howard Gao <howard.gao@gmail.com>
Authored: Mon Sep 12 16:54:06 2016 +0800
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Sep 15 11:17:02 2016 -0400

----------------------------------------------------------------------
 .../plug/ActiveMQProtonConnectionCallback.java  | 79 ++++++++++++++++++++
 .../org/proton/plug/AMQPConnectionCallback.java |  5 ++
 .../main/java/org/proton/plug/AmqpSupport.java  |  1 +
 .../plug/context/AbstractConnectionContext.java | 29 +++++--
 .../plug/context/ProtonInitializable.java       |  2 +-
 .../server/ProtonServerConnectionContext.java   | 16 +++-
 .../org/proton/plug/handler/ExtCapability.java  | 46 ++++++++++++
 .../plug/handler/impl/ProtonHandlerImpl.java    |  1 -
 .../context/AbstractConnectionContextTest.java  | 10 +++
 .../proton/plug/test/invm/ProtonINVMSPI.java    | 20 +++++
 .../plug/test/minimalclient/AMQPClientSPI.java  | 11 +++
 .../minimalserver/MinimalConnectionSPI.java     | 11 +++
 .../tests/integration/proton/ProtonTest.java    | 52 +++++++++----
 13 files changed, 258 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index 6e6a405..707b312 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton.plug;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -23,19 +25,35 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
 import org.apache.activemq.artemis.core.protocol.proton.sasl.ActiveMQPlainSASL;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
+import org.proton.plug.handler.ExtCapability;
 import org.proton.plug.sasl.AnonymousServerSASL;
+import org.proton.plug.sasl.PlainSASLResult;
+
+import static org.proton.plug.AmqpSupport.CONTAINER_ID;
+import static org.proton.plug.AmqpSupport.INVALID_FIELD;
+import static org.proton.plug.context.AbstractConnectionContext.CONNECTION_OPEN_FAILED;
 
 public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback {
+   private static final Logger log = Logger.getLogger(ActiveMQProtonConnectionCallback.class);
 
    private final ProtonProtocolManager manager;
 
@@ -49,6 +67,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
 
    private final Executor closeExecutor;
 
+   private ServerSession internalSession;
+
    public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
                                            Connection connection,
                                            Executor closeExecutor) {
@@ -86,7 +106,42 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
    }
 
    @Override
+   public void init() throws Exception {
+      //This internal core session is used to represent the connection
+      //in core server. It is used to identify unique clientIDs.
+      //Note the Qpid-JMS client does create a initial session
+      //for each connection. However is comes in as a Begin
+      //After Open. This makes it unusable for this purpose
+      //as we need to decide the uniqueness in response to
+      //Open, and the checking Uniqueness and adding the unique
+      //client-id to server need to be atomic.
+      if (internalSession == null) {
+         SASLResult saslResult = amqpConnection.getSASLResult();
+         String user = null;
+         String passcode = null;
+         if (saslResult != null) {
+            user = saslResult.getUser();
+            if (saslResult instanceof PlainSASLResult) {
+               passcode = ((PlainSASLResult) saslResult).getPassword();
+            }
+         }
+         internalSession = manager.getServer().createSession(createInternalSessionName(),
user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonConnectionDelegate, //
RemotingConnection remotingConnection,
+                 false,
+                 false,
+                 false,
+                 false,
+                 null, (SessionCallback) createSessionCallback(this.amqpConnection), true);
+      }
+   }
+
+   @Override
    public void close() {
+      try {
+         internalSession.close(false);
+      }
+      catch (Exception e) {
+         log.error("error closing internal session", e);
+      }
       connection.close();
       amqpConnection.close();
    }
@@ -151,4 +206,28 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
    public void sendSASLSupported() {
       connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1,
0, 0}));
    }
+
+   @Override
+   public boolean validateConnection(org.apache.qpid.proton.engine.Connection connection,
SASLResult saslResult) {
+      String remote = connection.getRemoteContainer();
+
+      if (ExtCapability.needUniqueConnection(connection)) {
+         if (!internalSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY,
remote)) {
+            //https://issues.apache.org/jira/browse/ARTEMIS-728
+            Map<Symbol, Object> connProp = new HashMap<>();
+            connProp.put(CONNECTION_OPEN_FAILED, "true");
+            connection.setProperties(connProp);
+            connection.getCondition().setCondition(AmqpError.INVALID_FIELD);
+            Map<Symbol, Symbol> info = new HashMap<>();
+            info.put(INVALID_FIELD, CONTAINER_ID);
+            connection.getCondition().setInfo(info);
+            return false;
+         }
+      }
+      return true;
+   }
+
+   private String createInternalSessionName() {
+      return "amqp:" + UUIDGenerator.getInstance().generateStringUUID();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index 199d68d..df14b0f 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -17,9 +17,12 @@
 package org.proton.plug;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.engine.Connection;
 
 public interface AMQPConnectionCallback {
 
+   void init() throws Exception;
+
    void close();
 
    /**
@@ -41,4 +44,6 @@ public interface AMQPConnectionCallback {
    boolean isSupportsAnonymous();
 
    void sendSASLSupported();
+
+   boolean validateConnection(Connection connection, SASLResult saslResult);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
index 1580855..4ddbbcc 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java
@@ -60,6 +60,7 @@ public class AmqpSupport {
    // Lifetime policy symbols
    public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
 
+   public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
    /**
     * Search for a given Symbol in a given array of Symbol object.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 9ece790..120a37b 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -188,6 +188,13 @@ public abstract class AbstractConnectionContext extends ProtonInitializable
impl
       return null;
    }
 
+   protected boolean validateConnection(Connection connection) {
+      return true;
+   }
+
+   protected void initInternal() throws Exception {
+   }
+
    // This listener will perform a bunch of things here
    class LocalListener extends DefaultEventHandler {
 
@@ -213,13 +220,25 @@ public abstract class AbstractConnectionContext extends ProtonInitializable
impl
       @Override
       public void onRemoteOpen(Connection connection) throws Exception {
          synchronized (getLock()) {
-            connection.setContext(AbstractConnectionContext.this);
-            connection.setContainer(containerId);
-            connection.setProperties(connectionProperties);
-            connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-            connection.open();
+            try {
+               initInternal();
+            }
+            catch (Exception e) {
+               log.error("Error init connection", e);
+            }
+            if (!validateConnection(connection)) {
+               connection.close();
+            }
+            else {
+               connection.setContext(AbstractConnectionContext.this);
+               connection.setContainer(containerId);
+               connection.setProperties(connectionProperties);
+               connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+               connection.open();
+            }
          }
          initialise();
+
          /*
          * This can be null which is in effect an empty map, also we really dont need to
check this for in bound connections
          * but its here in case we add support for outbound connections.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
index c065527..266e8b2 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonInitializable.java
@@ -39,7 +39,7 @@ public class ProtonInitializable {
 
    public void initialise() throws Exception {
       if (!initialized) {
-         initialized = false;
+         initialized = true;
          try {
             if (afterInit != null) {
                afterInit.run();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
index 83048d1..efaaed4 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -16,10 +16,9 @@
  */
 package org.proton.plug.context.server;
 
-import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
-
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
@@ -30,6 +29,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.context.AbstractConnectionContext;
 import org.proton.plug.context.AbstractProtonSessionContext;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
+import org.proton.plug.handler.ExtCapability;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -59,6 +59,16 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext
imp
    }
 
    @Override
+   protected boolean validateConnection(Connection connection) {
+      return connectionCallback.validateConnection(connection, handler.getSASLResult());
+   }
+
+   @Override
+   protected void initInternal() throws Exception {
+      connectionCallback.init();
+   }
+
+   @Override
    protected void remoteLinkOpened(Link link) throws Exception {
 
       ProtonServerSessionContext protonSession = (ProtonServerSessionContext) getSessionExtension(link.getSession());
@@ -84,6 +94,6 @@ public class ProtonServerConnectionContext extends AbstractConnectionContext
imp
 
    @Override
    public Symbol[] getConnectionCapabilitiesOffered() {
-      return new Symbol[]{DELAYED_DELIVERY};
+      return ExtCapability.getCapabilities();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
new file mode 100644
index 0000000..cbb96fd
--- /dev/null
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ExtCapability.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.proton.plug.handler;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.engine.Connection;
+
+import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY;
+import static org.proton.plug.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
+
+public class ExtCapability {
+
+   public static final Symbol[] capabilities = new Symbol[] {
+      SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY
+   };
+
+   public static Symbol[] getCapabilities() {
+      return capabilities;
+   }
+
+   public static boolean needUniqueConnection(Connection connection) {
+      Symbol[] extCapabilities = connection.getRemoteDesiredCapabilities();
+      if (extCapabilities != null) {
+         for (Symbol sym : extCapabilities) {
+            if (sym.compareTo(SOLE_CONNECTION_CAPABILITY) == 0) {
+               return true;
+            }
+         }
+      }
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
index 7208d16..b2f6406 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
@@ -386,5 +386,4 @@ public class ProtonHandlerImpl extends ProtonInitializable implements
ProtonHand
       }
 
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index a9bb152..91af8f5 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.handler.EventHandler;
@@ -72,6 +73,10 @@ public class AbstractConnectionContextTest {
    private class TestConnectionCallback implements AMQPConnectionCallback {
 
       @Override
+      public void init() throws Exception {
+      }
+
+      @Override
       public void close() {
 
       }
@@ -110,5 +115,10 @@ public class AbstractConnectionContextTest {
       public void sendSASLSupported() {
 
       }
+
+      @Override
+      public boolean validateConnection(Connection connection, SASLResult saslResult) {
+         return true;
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index f9b2e9a..bf83f8a 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -21,10 +21,12 @@ import java.util.concurrent.Executors;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.context.server.ProtonServerConnectionContext;
 import org.proton.plug.sasl.AnonymousServerSASL;
@@ -59,6 +61,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public void init() throws Exception {
+   }
+
+   @Override
    public void close() {
       mainExecutor.shutdown();
    }
@@ -79,6 +85,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public boolean validateConnection(Connection connection, SASLResult saslResult) {
+      return true;
+   }
+
+   @Override
    public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
       if (log.isTraceEnabled()) {
          ByteUtil.debugFrame(log, "InVM->", bytes);
@@ -126,6 +137,10 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
    class ReturnSPI implements AMQPConnectionCallback {
 
       @Override
+      public void init() throws Exception {
+      }
+
+      @Override
       public void close() {
 
       }
@@ -146,6 +161,11 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
       }
 
       @Override
+      public boolean validateConnection(Connection connection, SASLResult saslResult) {
+         return true;
+      }
+
+      @Override
       public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
{
 
          final int size = bytes.writerIndex();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index 17e51c7..fbdee59 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -22,10 +22,12 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.sasl.AnonymousServerSASL;
 import org.proton.plug.sasl.ServerSASLPlain;
@@ -53,6 +55,10 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public void init() throws Exception {
+   }
+
+   @Override
    public void close() {
 
    }
@@ -72,6 +78,11 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
 
    }
 
+   @Override
+   public boolean validateConnection(Connection connection, SASLResult saslResult) {
+      return true;
+   }
+
    final ReusableLatch latch = new ReusableLatch(0);
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 02cb06e..055b29d 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -25,10 +25,12 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
+import org.apache.qpid.proton.engine.Connection;
 import org.jboss.logging.Logger;
 import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.SASLResult;
 import org.proton.plug.ServerSASL;
 import org.proton.plug.sasl.AnonymousServerSASL;
 import org.proton.plug.sasl.ServerSASLPlain;
@@ -49,6 +51,10 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
    ExecutorService executorService = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
 
    @Override
+   public void init() throws Exception {
+   }
+
+   @Override
    public void close() {
       executorService.shutdown();
    }
@@ -81,6 +87,11 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public boolean validateConnection(Connection connection, SASLResult saslResult) {
+      return true;
+   }
+
+   @Override
    public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
       final int bufferSize = bytes.writerIndex();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/406d09d9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b3d9a5f..245c6b9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -27,6 +27,7 @@ import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -1533,6 +1534,35 @@ public class ProtonTest extends ProtonTestBase {
       connection.close();
    }
 
+   @Test
+   public void testClientID() throws Exception {
+      Connection testConn1 = createConnection(false);
+      Connection testConn2 = createConnection(false);
+      try {
+         testConn1.setClientID("client-id1");
+         try {
+            testConn1.setClientID("client-id2");
+            fail("didn't get expected exception");
+         }
+         catch (javax.jms.IllegalStateException e) {
+            //expected
+         }
+
+         try {
+            testConn2.setClientID("client-id1");
+            fail("didn't get expected exception");
+         }
+         catch (InvalidClientIDException e) {
+            //expected
+         }
+      }
+      finally {
+         testConn1.close();
+         testConn2.close();
+      }
+
+   }
+
    private javax.jms.Queue createQueue(String address) throws Exception {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       try {
@@ -1543,29 +1573,19 @@ public class ProtonTest extends ProtonTestBase {
       }
    }
 
-   private javax.jms.Connection createConnection() throws JMSException {
+   private Connection createConnection() throws JMSException {
+      return this.createConnection(true);
+   }
+
+   private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
       Connection connection;
       if (protocol == 3) {
          factory = new JmsConnectionFactory(amqpConnectionUri);
          connection = factory.createConnection();
-         connection.setExceptionListener(new ExceptionListener() {
-            @Override
-            public void onException(JMSException exception) {
-               exception.printStackTrace();
-            }
-         });
-         connection.start();
       }
       else if (protocol == 0) {
          factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
          connection = factory.createConnection();
-         connection.setExceptionListener(new ExceptionListener() {
-            @Override
-            public void onException(JMSException exception) {
-               exception.printStackTrace();
-            }
-         });
-         connection.start();
       }
       else {
          TransportConfiguration transport;
@@ -1579,6 +1599,8 @@ public class ProtonTest extends ProtonTestBase {
          }
 
          connection = factory.createConnection(userName, password);
+      }
+      if (isStart) {
          connection.setExceptionListener(new ExceptionListener() {
             @Override
             public void onException(JMSException exception) {


Mime
View raw message