activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-711 Fix handling of 'bare' header
Date Thu, 01 Sep 2016 19:28:44 GMT
ARTEMIS-711 Fix handling of 'bare' header


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1cf96f4f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1cf96f4f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1cf96f4f

Branch: refs/heads/master
Commit: 1cf96f4fcf77720412fcdf00049e7551f8712867
Parents: 117a4c3
Author: jbertram <jbertram@apache.org>
Authored: Mon Jun 13 13:37:24 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Sep 1 15:23:48 2016 -0400

----------------------------------------------------------------------
 .../plug/ActiveMQProtonConnectionCallback.java  |  34 ++-
 .../org/proton/plug/AMQPConnectionCallback.java |   4 +
 .../plug/context/AbstractConnectionContext.java |  13 +-
 .../org/proton/plug/handler/EventHandler.java   |   2 +-
 .../plug/handler/impl/ProtonHandlerImpl.java    |  13 +-
 .../context/AbstractConnectionContextTest.java  |  10 +
 .../proton/plug/test/invm/ProtonINVMSPI.java    |  20 ++
 .../plug/test/minimalclient/AMQPClientSPI.java  |  10 +
 .../minimalserver/MinimalConnectionSPI.java     |  10 +
 .../tests/integration/openwire/util/Wait.java   |  55 -----
 .../integration/proton/ProtonTestForHeader.java | 222 +++++++++++++++++++
 .../activemq/artemis/tests/util/Wait.java       |  55 +++++
 12 files changed, 375 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
index 1255b13..e3de595 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.protocol.proton.ActiveMQProtonRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.proton.ProtonProtocolManager;
@@ -48,7 +49,9 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
 
    private final Executor closeExecutor;
 
-   public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection,
Executor closeExecutor) {
+   public ActiveMQProtonConnectionCallback(ProtonProtocolManager manager,
+                                           Connection connection,
+                                           Executor closeExecutor) {
       this.manager = manager;
       this.connection = connection;
       this.closeExecutor = closeExecutor;
@@ -56,18 +59,10 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
 
    @Override
    public ServerSASL[] getSASLMechnisms() {
-      boolean supportsAnonymous = false;
-      try {
-         manager.getServer().getSecurityStore().authenticate(null, null, null);
-         supportsAnonymous = true;
-      }
-      catch (Exception e) {
-         // authentication failed so no anonymous support
-      }
 
       ServerSASL[] result;
 
-      if (supportsAnonymous) {
+      if (isSupportsAnonymous()) {
          result = new ServerSASL[]{new ActiveMQPlainSASL(manager.getServer().getSecurityStore()),
new AnonymousServerSASL()};
       }
       else {
@@ -77,9 +72,22 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
       return result;
    }
 
+   public boolean isSupportsAnonymous() {
+      boolean supportsAnonymous = false;
+      try {
+         manager.getServer().getSecurityStore().authenticate(null, null, null);
+         supportsAnonymous = true;
+      }
+      catch (Exception e) {
+         // authentication failed so no anonymous support
+      }
+      return supportsAnonymous;
+   }
+
    @Override
    public void close() {
-
+      connection.close();
+      amqpConnection.close();
    }
 
    public Executor getExeuctor() {
@@ -138,4 +146,8 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
       return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection,
closeExecutor);
    }
 
+   @Override
+   public void sendSASLSupported() {
+      connection.write(ActiveMQBuffers.wrappedBuffer(new byte[]{'A', 'M', 'Q', 'P', 3, 1,
0, 0}));
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
index c274469..199d68d 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionCallback.java
@@ -37,4 +37,8 @@ public interface AMQPConnectionCallback {
    AMQPConnectionContext getConnection();
 
    ServerSASL[] getSASLMechnisms();
+
+   boolean isSupportsAnonymous();
+
+   void sendSASLSupported();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
index b1c8bc7..c881031 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java
@@ -187,8 +187,17 @@ public abstract class AbstractConnectionContext extends ProtonInitializable
impl
    class LocalListener extends DefaultEventHandler {
 
       @Override
-      public void onSASLInit(ProtonHandler handler, Connection connection) {
-         handler.createServerSASL(connectionCallback.getSASLMechnisms());
+      public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl)
{
+         if (sasl) {
+            handler.createServerSASL(connectionCallback.getSASLMechnisms());
+         }
+         else {
+            if (!connectionCallback.isSupportsAnonymous()) {
+               connectionCallback.sendSASLSupported();
+               connectionCallback.close();
+               handler.close();
+            }
+         }
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
index e96169f..c020cbb 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java
@@ -27,7 +27,7 @@ import org.apache.qpid.proton.engine.Transport;
  */
 public interface EventHandler {
 
-   void onSASLInit(ProtonHandler handler, Connection connection);
+   void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
 
    void onInit(Connection connection) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
index cbc993a..7208d16 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java
@@ -50,6 +50,10 @@ public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHand
 
    private static final Logger log = Logger.getLogger(ProtonHandlerImpl.class);
 
+   private static final byte SASL = 0x03;
+
+   private static final byte BARE = 0x00;
+
    private final Transport transport = Proton.transport();
 
    private final Connection connection = Proton.connection();
@@ -170,8 +174,9 @@ public class ProtonHandlerImpl extends ProtonInitializable implements
ProtonHand
 
             if (!receivedFirstPacket) {
                try {
-                  if (buffer.getByte(4) == 0x03) {
-                     dispatchSASL();
+                  byte auth = buffer.getByte(4);
+                  if (auth == SASL || auth == BARE) {
+                     dispatchAuth(auth == SASL);
                      /*
                      * there is a chance that if SASL Handshake has been carried out that
the capacity may change.
                      * */
@@ -343,9 +348,9 @@ public class ProtonHandlerImpl extends ProtonInitializable implements
ProtonHand
       }
    }
 
-   private void dispatchSASL() {
+   private void dispatchAuth(boolean sasl) {
       for (EventHandler h : handlers) {
-         h.onSASLInit(this, getConnection());
+         h.onAuthInit(this, getConnection(), sasl);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
index bfa0bad..193a2fa 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/context/AbstractConnectionContextTest.java
@@ -100,5 +100,15 @@ public class AbstractConnectionContextTest {
       public ServerSASL[] getSASLMechnisms() {
          return null;
       }
+
+      @Override
+      public boolean isSupportsAnonymous() {
+         return false;
+      }
+
+      @Override
+      public void sendSASLSupported() {
+
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
index b8008f4..456b566 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -69,6 +69,16 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public boolean isSupportsAnonymous() {
+      return false;
+   }
+
+   @Override
+   public void sendSASLSupported() {
+
+   }
+
+   @Override
    public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
       if (log.isTraceEnabled()) {
          ByteUtil.debugFrame(log, "InVM->", bytes);
@@ -126,6 +136,16 @@ public class ProtonINVMSPI implements AMQPConnectionCallback {
       }
 
       @Override
+      public boolean isSupportsAnonymous() {
+         return false;
+      }
+
+      @Override
+      public void sendSASLSupported() {
+
+      }
+
+      @Override
       public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
{
 
          final int size = bytes.writerIndex();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
index 6497321..63c49eb 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -62,6 +62,16 @@ public class AMQPClientSPI implements AMQPConnectionCallback {
       return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
    }
 
+   @Override
+   public boolean isSupportsAnonymous() {
+      return false;
+   }
+
+   @Override
+   public void sendSASLSupported() {
+
+   }
+
    final ReusableLatch latch = new ReusableLatch(0);
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
index 87c337c..7d49dd6 100644
--- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
+++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -71,6 +71,16 @@ public class MinimalConnectionSPI implements AMQPConnectionCallback {
    }
 
    @Override
+   public boolean isSupportsAnonymous() {
+      return false;
+   }
+
+   @Override
+   public void sendSASLSupported() {
+
+   }
+
+   @Override
    public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) {
       final int bufferSize = bytes.writerIndex();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/util/Wait.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/util/Wait.java
deleted file mode 100644
index 0275a68..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/util/Wait.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.openwire.util;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility adapted from: org.apache.activemq.util.Wait
- */
-public class Wait {
-
-   public static final long MAX_WAIT_MILLIS = 30 * 1000;
-   public static final int SLEEP_MILLIS = 1000;
-
-   public interface Condition {
-
-      boolean isSatisified() throws Exception;
-   }
-
-   public static boolean waitFor(Condition condition) throws Exception {
-      return waitFor(condition, MAX_WAIT_MILLIS);
-   }
-
-   public static boolean waitFor(final Condition condition, final long duration) throws Exception
{
-      return waitFor(condition, duration, SLEEP_MILLIS);
-   }
-
-   public static boolean waitFor(final Condition condition,
-                                 final long duration,
-                                 final int sleepMillis) throws Exception {
-
-      final long expiry = System.currentTimeMillis() + duration;
-      boolean conditionSatisified = condition.isSatisified();
-      while (!conditionSatisified && System.currentTimeMillis() < expiry) {
-         TimeUnit.MILLISECONDS.sleep(sleepMillis);
-         conditionSatisified = condition.isSatisified();
-      }
-      return conditionSatisified;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
new file mode 100644
index 0000000..b4a6a34
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTestForHeader.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.proton;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ProtonTestForHeader extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      disableCheckThread();
+      server = this.createServer(true, true);
+      HashMap<String, Object> params = new HashMap<>();
+      params.put(TransportConstants.PORT_PROP_NAME, "5672");
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
+      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY,
params);
+
+      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+      server.getConfiguration().setSecurityEnabled(true);
+      server.start();
+      ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
+      securityManager.getConfiguration().addUser("auser", "pass");
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         Thread.sleep(250);
+
+         server.stop();
+      }
+      finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testSimpleBytes() throws Exception {
+      final AmqpHeader header = new AmqpHeader();
+
+      header.setProtocolId(0);
+      header.setMajor(1);
+      header.setMinor(0);
+      header.setRevision(0);
+
+      final ClientConnection connection = new ClientConnection();
+      connection.open("localhost", 5672);
+      connection.send(header);
+
+      AmqpHeader response = connection.readAmqpHeader();
+      assertNotNull(response);
+      IntegrationTestLogger.LOGGER.info("Broker responded with: " + response);
+
+      assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition()
{
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            try {
+               connection.send(header);
+               return false;
+            }
+            catch (Exception e) {
+               return true;
+            }
+         }
+      }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+   }
+
+   private class ClientConnection {
+
+      protected static final long RECEIVE_TIMEOUT = 10000;
+      protected Socket clientSocket;
+
+      public void open(String host, int port) throws IOException {
+         clientSocket = new Socket(host, port);
+         clientSocket.setTcpNoDelay(true);
+      }
+
+      public void send(AmqpHeader header) throws Exception {
+         IntegrationTestLogger.LOGGER.info("Client sending header: " + header);
+         OutputStream outputStream = clientSocket.getOutputStream();
+         header.getBuffer().writeTo(outputStream);
+         outputStream.flush();
+      }
+
+      public AmqpHeader readAmqpHeader() throws Exception {
+         clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT);
+         InputStream is = clientSocket.getInputStream();
+
+         byte[] header = new byte[8];
+         int read = is.read(header);
+         if (read == header.length) {
+            return new AmqpHeader(new Buffer(header));
+         }
+         else {
+            return null;
+         }
+      }
+   }
+
+   private class AmqpHeader {
+
+      final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'});
+
+      private Buffer buffer;
+
+      AmqpHeader() {
+         this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0}));
+      }
+
+      AmqpHeader(Buffer buffer) {
+         this(buffer, true);
+      }
+
+      AmqpHeader(Buffer buffer, boolean validate) {
+         setBuffer(buffer, validate);
+      }
+
+      public int getProtocolId() {
+         return buffer.get(4) & 0xFF;
+      }
+
+      public void setProtocolId(int value) {
+         buffer.data[buffer.offset + 4] = (byte) value;
+      }
+
+      public int getMajor() {
+         return buffer.get(5) & 0xFF;
+      }
+
+      public void setMajor(int value) {
+         buffer.data[buffer.offset + 5] = (byte) value;
+      }
+
+      public int getMinor() {
+         return buffer.get(6) & 0xFF;
+      }
+
+      public void setMinor(int value) {
+         buffer.data[buffer.offset + 6] = (byte) value;
+      }
+
+      public int getRevision() {
+         return buffer.get(7) & 0xFF;
+      }
+
+      public void setRevision(int value) {
+         buffer.data[buffer.offset + 7] = (byte) value;
+      }
+
+      public Buffer getBuffer() {
+         return buffer;
+      }
+
+      public void setBuffer(Buffer value) {
+         setBuffer(value, true);
+      }
+
+      public void setBuffer(Buffer value, boolean validate) {
+         if (validate && !value.startsWith(PREFIX) || value.length() != 8) {
+            throw new IllegalArgumentException("Not an AMQP header buffer");
+         }
+         buffer = value.buffer();
+      }
+
+      public boolean hasValidPrefix() {
+         return buffer.startsWith(PREFIX);
+      }
+
+      @Override
+      public String toString() {
+         StringBuilder builder = new StringBuilder();
+         for (int i = 0; i < buffer.length(); ++i) {
+            char value = (char) buffer.get(i);
+            if (Character.isLetter(value)) {
+               builder.append(value);
+            }
+            else {
+               builder.append(",");
+               builder.append((int) value);
+            }
+         }
+         return builder.toString();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1cf96f4f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
new file mode 100644
index 0000000..795a478
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/Wait.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility adapted from: org.apache.activemq.util.Wait
+ */
+public class Wait {
+
+   public static final long MAX_WAIT_MILLIS = 30 * 1000;
+   public static final int SLEEP_MILLIS = 1000;
+
+   public interface Condition {
+
+      boolean isSatisfied() throws Exception;
+   }
+
+   public static boolean waitFor(Condition condition) throws Exception {
+      return waitFor(condition, MAX_WAIT_MILLIS);
+   }
+
+   public static boolean waitFor(final Condition condition, final long duration) throws Exception
{
+      return waitFor(condition, duration, SLEEP_MILLIS);
+   }
+
+   public static boolean waitFor(final Condition condition,
+                                 final long duration,
+                                 final long sleepMillis) throws Exception {
+
+      final long expiry = System.currentTimeMillis() + duration;
+      boolean conditionSatisified = condition.isSatisfied();
+      while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+         TimeUnit.MILLISECONDS.sleep(sleepMillis);
+         conditionSatisified = condition.isSatisfied();
+      }
+      return conditionSatisified;
+   }
+
+}


Mime
View raw message