activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [08/48] activemq-artemis git commit: ARTEMIS-398 - AMQP protocol idle timeout issue
Date Tue, 16 Feb 2016 20:10:22 GMT
ARTEMIS-398 - AMQP protocol idle timeout issue

added functionality to tick every n seconds where n is 1/2 the idle timeout

https://issues.apache.org/jira/browse/ARTEMIS-398


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7702a0a1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7702a0a1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7702a0a1

Branch: refs/heads/refactor-openwire
Commit: 7702a0a1f9455ba41306c9134b23d2cbdbc68fcb
Parents: 7d9cafe
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Mon Feb 15 10:06:49 2016 +0000
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Tue Feb 16 11:24:09 2016 +0000

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  |  2 +-
 .../plug/AMQPConnectionContextFactory.java      |  7 +++--
 .../plug/context/AbstractConnectionContext.java | 31 +++++++++++++++++---
 .../client/ProtonClientConnectionContext.java   | 11 ++++---
 .../ProtonClientConnectionContextFactory.java   | 12 +++++---
 .../server/ProtonServerConnectionContext.java   | 11 ++++---
 .../ProtonServerConnectionContextFactory.java   | 11 ++++---
 .../org/proton/plug/handler/ProtonHandler.java  |  2 ++
 .../plug/handler/impl/ProtonHandlerImpl.java    | 24 +++++++++++++++
 .../context/AbstractConnectionContextTest.java  |  2 +-
 .../plug/test/invm/InVMTestConnector.java       |  2 +-
 .../proton/plug/test/invm/ProtonINVMSPI.java    |  2 +-
 .../test/minimalclient/SimpleAMQPConnector.java |  2 +-
 .../plug/test/minimalserver/MinimalServer.java  |  2 +-
 14 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index 952bf12..cdaf34f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -100,7 +100,7 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>,
Noti
       }
 
       AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().
-         createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
+         createConnection(connectionCallback, (int) ttl, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX,
server.getScheduledPool());
 
       Executor executor = server.getExecutorFactory().getExecutor();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
index b1ef185..cafe8f1 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContextFactory.java
@@ -16,6 +16,8 @@
  */
 package org.proton.plug;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public abstract class AMQPConnectionContextFactory {
 
    /**
@@ -24,10 +26,11 @@ public abstract class AMQPConnectionContextFactory {
    public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
                                                           int idleTimeout,
                                                           int maxFrameSize,
-                                                          int channelMax);
+                                                          int channelMax,
+                                                          ScheduledExecutorService scheduledPool);
 
    /**
     * @return
     */
-   public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback);
+   public abstract AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
ScheduledExecutorService scheduledPool);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index 023ee4f..9486a1b 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -18,8 +18,11 @@ package org.proton.plug.context;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
@@ -40,28 +43,32 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
 
 public abstract class AbstractConnectionContext extends ProtonInitializable implements AMQPConnectionContext
{
 
+   public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
+
    protected ProtonHandler handler = ProtonHandler.Factory.create();
 
    protected AMQPConnectionCallback connectionCallback;
+   private final ScheduledExecutorService scheduledPool;
 
    private final Map<Session, AbstractProtonSessionContext> sessions = new ConcurrentHashMap<>();
 
    protected LocalListener listener = new LocalListener();
 
-   public AbstractConnectionContext(AMQPConnectionCallback connectionCallback) {
-      this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX);
+   public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService
scheduledPool) {
+      this(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE, DEFAULT_CHANNEL_MAX,
scheduledPool);
    }
 
    public AbstractConnectionContext(AMQPConnectionCallback connectionCallback,
                                     int idleTimeout,
                                     int maxFrameSize,
-                                    int channelMax) {
+                                    int channelMax,
+                                    ScheduledExecutorService scheduledPool) {
       this.connectionCallback = connectionCallback;
+      this.scheduledPool = scheduledPool;
       connectionCallback.setConnection(this);
       Transport transport = handler.getTransport();
       if (idleTimeout > 0) {
          transport.setIdleTimeout(idleTimeout);
-         transport.tick(idleTimeout / 2);
       }
       transport.setChannelMax(channelMax);
       transport.setMaxFrameSize(maxFrameSize);
@@ -172,6 +179,22 @@ public abstract class AbstractConnectionContext extends ProtonInitializable
impl
             connection.open();
          }
          initialise();
+         if (!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
+            long nextKeepAliveTime = handler.tick(true);
+            flushBytes();
+            if (nextKeepAliveTime > 0) {
+               scheduledPool.schedule(new Runnable() {
+                  @Override
+                  public void run() {
+                     long rescheduleAt = (handler.tick(false) - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+                     flushBytes();
+                     if (rescheduleAt > 0) {
+                        scheduledPool.schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
+                     }
+                  }
+               }, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())),
TimeUnit.MILLISECONDS);
+            }
+         }
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
index 63e9250..531b182 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java
@@ -29,17 +29,20 @@ import org.proton.plug.exceptions.ActiveMQAMQPException;
 import org.proton.plug.context.ProtonInitializable;
 import org.proton.plug.util.FutureRunnable;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext
{
 
-   public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback) {
-      super(connectionCallback);
+   public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService
scheduledPool) {
+      super(connectionCallback, scheduledPool);
    }
 
    public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback,
                                         int idleTimeout,
                                         int maxFrameSize,
-                                        int channelMax) {
-      super(connectionCallback, idleTimeout, maxFrameSize, channelMax);
+                                        int channelMax,
+                                        ScheduledExecutorService scheduledPool) {
+      super(connectionCallback, idleTimeout, maxFrameSize, channelMax, scheduledPool);
    }
 
    // Maybe a client interface?

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
index 8bc54c5..2beb95c 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java
@@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContext;
 import org.proton.plug.AMQPConnectionContextFactory;
 import org.proton.plug.AMQPConnectionCallback;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory {
 
    private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory();
@@ -29,15 +31,17 @@ public class ProtonClientConnectionContextFactory extends AMQPConnectionContextF
    }
 
    @Override
-   public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
{
-      return new ProtonClientConnectionContext(connectionCallback);
+   public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
ScheduledExecutorService scheduledPool) {
+      return new ProtonClientConnectionContext(connectionCallback, scheduledPool);
    }
 
+
    @Override
    public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
                                                  int idleTimeout,
                                                  int maxFrameSize,
-                                                 int channelMax) {
-      return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize,
channelMax);
+                                                 int channelMax,
+                                                 ScheduledExecutorService scheduledPool)
{
+      return new ProtonClientConnectionContext(connectionCallback, idleTimeout, maxFrameSize,
channelMax, scheduledPool);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
index e4e554d..606a3a3 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java
@@ -28,17 +28,20 @@ import org.proton.plug.context.AbstractConnectionContext;
 import org.proton.plug.context.AbstractProtonSessionContext;
 import org.proton.plug.exceptions.ActiveMQAMQPException;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext
{
 
-   public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP) {
-      super(connectionSP);
+   public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP, ScheduledExecutorService
scheduledPool) {
+      super(connectionSP, scheduledPool);
    }
 
    public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP,
                                         int idleTimeout,
                                         int maxFrameSize,
-                                        int channelMax) {
-      super(connectionSP, idleTimeout, maxFrameSize, channelMax);
+                                        int channelMax,
+                                        ScheduledExecutorService scheduledPool) {
+      super(connectionSP, idleTimeout, maxFrameSize, channelMax, scheduledPool);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
index 0c5c95f..893c479 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java
@@ -20,6 +20,8 @@ import org.proton.plug.AMQPConnectionContextFactory;
 import org.proton.plug.AMQPConnectionCallback;
 import org.proton.plug.AMQPServerConnectionContext;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_IDLE_TIMEOUT;
 import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_CHANNEL_MAX;
 import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME_SIZE;
@@ -33,15 +35,16 @@ public class ProtonServerConnectionContextFactory extends AMQPConnectionContextF
    }
 
    @Override
-   public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback)
{
-      return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE,
DEFAULT_CHANNEL_MAX);
+   public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
ScheduledExecutorService scheduledPool) {
+      return createConnection(connectionCallback, DEFAULT_IDLE_TIMEOUT, DEFAULT_MAX_FRAME_SIZE,
DEFAULT_CHANNEL_MAX, scheduledPool);
    }
 
    @Override
    public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback,
                                                        int idleTimeout,
                                                        int maxFrameSize,
-                                                       int channelMax) {
-      return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize,
channelMax);
+                                                       int channelMax,
+                                                       ScheduledExecutorService scheduledPool)
{
+      return new ProtonServerConnectionContext(connectionCallback, idleTimeout, maxFrameSize,
channelMax, scheduledPool);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
index 366663c..1ae0dff 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java
@@ -29,6 +29,8 @@ import org.proton.plug.handler.impl.ProtonHandlerImpl;
  */
 public interface ProtonHandler {
 
+   long tick(boolean firstTick);
+
    public static final class Factory {
 
       public static ProtonHandler create() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
index 1fac0dd..cd5d157 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
@@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -27,6 +28,7 @@ import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
@@ -45,6 +47,7 @@ import org.proton.plug.util.DebugInfo;
  */
 public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler {
 
+
    private final Transport transport = Proton.transport();
 
    private final Connection connection = Proton.connection();
@@ -83,6 +86,27 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
    }
 
    @Override
+   public long tick(boolean firstTick) {
+      if (!firstTick) {
+         try {
+            if (connection.getLocalState() != EndpointState.CLOSED) {
+               long rescheduleAt = transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+               if (transport.isClosed()) {
+                  throw new IllegalStateException("Channel was inactive for to long");
+               }
+               return rescheduleAt;
+            }
+         }
+         catch (Exception e) {
+            transport.close();
+            connection.setCondition(new ErrorCondition());
+         }
+         return 0;
+      }
+      return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
+   }
+
+   @Override
    public int capacity() {
       synchronized (lock) {
          return transport.capacity();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index 10be09f..46d7c64 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -48,7 +48,7 @@ public class AbstractConnectionContextTest {
    private class TestConnectionContext extends AbstractConnectionContext {
 
       public TestConnectionContext(AMQPConnectionCallback connectionCallback) {
-         super(connectionCallback);
+         super(connectionCallback, null);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
index 3085b40..6a84a95 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
@@ -32,6 +32,6 @@ public class InVMTestConnector implements Connector {
 
    @Override
    public AMQPClientConnectionContext connect(String host, int port) throws Exception {
-      return new ProtonClientConnectionContext(new ProtonINVMSPI());
+      return new ProtonClientConnectionContext(new ProtonINVMSPI(), null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index aff0fa1..68a2789 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -35,7 +35,7 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
 
    AMQPConnectionContext returningConnection;
 
-   ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new
ReturnSPI());
+   ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new
ReturnSPI(), null);
 
    final ExecutorService mainExecutor = Executors.newSingleThreadExecutor();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
index bb7d0d4..1e12410 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
@@ -59,7 +59,7 @@ public class SimpleAMQPConnector implements Connector {
 
       AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
 
-      final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI);
+      final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI,
null);
 
       future.channel().pipeline().addLast(new ChannelDuplexHandler() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7702a0a1/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
index 1ed3474..a1b1462 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalServer.java
@@ -124,7 +124,7 @@ public class MinimalServer {
       @Override
       public void channelActive(ChannelHandlerContext ctx) throws Exception {
          super.channelActive(ctx);
-         connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new
MinimalConnectionSPI(ctx.channel()));
+         connection = ProtonServerConnectionContextFactory.getFactory().createConnection(new
MinimalConnectionSPI(ctx.channel()), null);
          //ctx.read();
       }
 


Mime
View raw message