activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [29/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:00:59 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
new file mode 100644
index 0000000..781a11c
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.sasl;
+
+import org.proton.plug.ClientSASL;
+
+/**
+ * This is a simple implementation provided with just user/password
+ * TODO: this interface will probaby change as we are challenged with more SASL cases where there is a communication between client and server to determine the authentication
+ *
+ * @author Clebert Suconic
+ */
+public class ClientSASLPlain implements ClientSASL
+{
+   private String username;
+   private String password;
+
+   public ClientSASLPlain(String user, String password)
+   {
+      this.username = user;
+      this.password = password;
+   }
+
+   public String getName()
+   {
+      return "PLAIN";
+   }
+
+   public byte[] getBytes()
+   {
+
+      if (username == null)
+      {
+         username = "";
+      }
+
+      if (password == null)
+      {
+         password = "";
+      }
+
+      byte[] usernameBytes = username.getBytes();
+      byte[] passwordBytes = password.getBytes();
+      byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
+      System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
+      System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
+      return data;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
new file mode 100644
index 0000000..85e9aac
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.sasl;
+
+import org.proton.plug.SASLResult;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class PlainSASLResult implements SASLResult
+{
+   private boolean success;
+   private String user;
+   private String password;
+
+   public PlainSASLResult(boolean success, String user, String password)
+   {
+      this.success = success;
+      this.user = user;
+      this.password = password;
+   }
+
+   @Override
+   public String getUser()
+   {
+      return user;
+   }
+
+   public String getPassword()
+   {
+      return password;
+   }
+
+   @Override
+   public boolean isSuccess()
+   {
+      return success;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
new file mode 100644
index 0000000..490c89c
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.sasl;
+
+import org.proton.plug.SASLResult;
+import org.proton.plug.ServerSASL;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ServerSASLPlain implements ServerSASL
+{
+   public static final String NAME = "PLAIN";
+
+   @Override
+   public String getName()
+   {
+      return NAME;
+   }
+
+   @Override
+   public SASLResult processSASL(byte[] data)
+   {
+
+      String username = null;
+      String password = null;
+      String bytes = new String(data);
+      String[] credentials = bytes.split(Character.toString((char) 0));
+      int offSet = 0;
+      if (credentials.length > 0)
+      {
+         if (credentials[0].length() == 0)
+         {
+            offSet = 1;
+         }
+
+         if (credentials.length >= offSet)
+         {
+            username = credentials[offSet];
+         }
+         if (credentials.length >= (offSet + 1))
+         {
+            password = credentials[offSet + 1];
+         }
+      }
+
+      boolean success = authenticate(username, password);
+
+      return new PlainSASLResult(success, username, password);
+   }
+
+
+   /**
+    * Hook for subclasses to perform the authentication here
+    *
+    * @param user
+    * @param password
+    */
+   protected boolean authenticate(String user, String password)
+   {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
new file mode 100644
index 0000000..7d12f31
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ByteUtil
+{
+   public static void debugFrame(String message, ByteBuf byteIn)
+   {
+      int location = byteIn.readerIndex();
+      // debugging
+      byte[] frame = new byte[byteIn.writerIndex()];
+      byteIn.readBytes(frame);
+
+      try
+      {
+         System.out.println(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16));
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+
+      byteIn.readerIndex(location);
+   }
+
+
+   public static String formatGroup(String str, int groupSize, int lineBreak)
+   {
+      StringBuffer buffer = new StringBuffer();
+
+      int line = 1;
+      buffer.append("/*  1 */ \"");
+      for (int i = 0; i < str.length(); i += groupSize)
+      {
+         buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize)));
+
+         if ((i + groupSize) % lineBreak == 0)
+         {
+            buffer.append("\" +\n/* ");
+            line++;
+            if (line < 10)
+            {
+               buffer.append(" ");
+            }
+            buffer.append(Integer.toString(line) + " */ \"");
+         }
+         else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize)
+         {
+            buffer.append("\" + \"");
+         }
+      }
+
+      buffer.append("\";");
+
+      return buffer.toString();
+
+   }
+
+   protected static final char[] hexArray = "0123456789ABCDEF".toCharArray();
+
+   public static String bytesToHex(byte[] bytes)
+   {
+      char[] hexChars = new char[bytes.length * 2];
+      for (int j = 0; j < bytes.length; j++)
+      {
+         int v = bytes[j] & 0xFF;
+         hexChars[j * 2] = hexArray[v >>> 4];
+         hexChars[j * 2 + 1] = hexArray[v & 0x0F];
+      }
+      return new String(hexChars);
+   }
+
+   public static byte[] hexStringToByteArray(String s)
+   {
+      int len = s.length();
+      byte[] data = new byte[len / 2];
+      for (int i = 0; i < len; i += 2)
+      {
+         data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+            + Character.digit(s.charAt(i + 1), 16));
+      }
+      return data;
+   }
+
+   public static byte[] longToBytes(long x)
+   {
+      ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8);
+      buffer.writeLong(x);
+      return buffer.array();
+   }
+
+   public static String maxString(String value, int size)
+   {
+      if (value.length() < size)
+      {
+         return value;
+      }
+      else
+      {
+         return value.substring(0, size / 2) + " ... " + value.substring(value.length() - size / 2);
+      }
+   }
+
+   public static String bytesToHex(byte[] bytes, int groupSize)
+   {
+      char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)];
+      int outPos = 0;
+      for (int j = 0; j < bytes.length; j++)
+      {
+         if (j > 0 && j % groupSize == 0)
+         {
+            hexChars[outPos++] = ' ';
+         }
+         int v = bytes[j] & 0xFF;
+         hexChars[outPos++] = hexArray[v >>> 4];
+         hexChars[outPos++] = hexArray[v & 0x0F];
+      }
+      return new String(hexChars);
+   }
+
+   private static int numberOfGroups(byte[] bytes, int groupSize)
+   {
+      int groups = bytes.length / groupSize;
+
+      if (bytes.length % groupSize == 0)
+      {
+         groups--;
+      }
+
+      return groups;
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
new file mode 100644
index 0000000..bb10c5a
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import org.apache.qpid.proton.codec.AMQPDefinedTypes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class CodecCache
+{
+
+   private static class EncoderDecoderPair
+   {
+      DecoderImpl decoder = new DecoderImpl();
+      EncoderImpl encoder = new EncoderImpl(decoder);
+
+      {
+         AMQPDefinedTypes.registerAllTypes(decoder, encoder);
+      }
+   }
+
+   private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>()
+   {
+      @Override
+      protected EncoderDecoderPair initialValue()
+      {
+         return new EncoderDecoderPair();
+      }
+   };
+
+   public static DecoderImpl getDecoder()
+   {
+      return tlsCodec.get().decoder;
+   }
+
+   public static EncoderImpl getEncoder()
+   {
+      return tlsCodec.get().encoder;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
new file mode 100644
index 0000000..36deee2
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class CreditsSemaphore
+{
+
+   @SuppressWarnings("serial")
+   private static class Sync extends AbstractQueuedSynchronizer
+   {
+      public Sync(int initial)
+      {
+         setState(initial);
+      }
+
+      public int getCredits()
+      {
+         return getState();
+      }
+
+      @Override
+      public int tryAcquireShared(final int numberOfAqcquires)
+      {
+         for (;;)
+         {
+            int actualSize = getState();
+            int newValue = actualSize - numberOfAqcquires;
+
+            if (newValue < 0)
+            {
+               if (actualSize == getState())
+               {
+                  return -1;
+               }
+            }
+            else if (compareAndSetState(actualSize, newValue))
+            {
+               return newValue;
+            }
+         }
+      }
+
+      @Override
+      public boolean tryReleaseShared(final int numberOfReleases)
+      {
+         for (;;)
+         {
+            int actualSize = getState();
+            int newValue = actualSize + numberOfReleases;
+
+            if (compareAndSetState(actualSize, newValue))
+            {
+               return true;
+            }
+
+         }
+      }
+
+      public void setCredits(final int credits)
+      {
+         for (;;)
+         {
+            int actualState = getState();
+            if (compareAndSetState(actualState, credits))
+            {
+               // This is to wake up any pending threads that could be waiting on queued
+               releaseShared(0);
+               return;
+            }
+         }
+      }
+   }
+
+   private final Sync sync;
+
+
+   public CreditsSemaphore(int initialCredits)
+   {
+      sync = new Sync(initialCredits);
+   }
+
+   public void acquire() throws InterruptedException
+   {
+      sync.acquireSharedInterruptibly(1);
+   }
+
+   public boolean tryAcquire()
+   {
+      return sync.tryAcquireShared(1) >= 0;
+   }
+
+   public void release() throws InterruptedException
+   {
+      sync.releaseShared(1);
+   }
+
+   public void release(int credits) throws InterruptedException
+   {
+      sync.releaseShared(credits);
+   }
+
+   public void setCredits(int credits)
+   {
+      sync.setCredits(credits);
+   }
+
+   public int getCredits()
+   {
+      return sync.getCredits();
+   }
+
+   public boolean hasQueuedThreads()
+   {
+      return sync.hasQueuedThreads();
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java
new file mode 100644
index 0000000..6481413
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class DebugInfo
+{
+   public static final boolean debug = false;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
new file mode 100644
index 0000000..7430c44
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class DeliveryUtil
+{
+
+   public static int readDelivery(Receiver receiver, ByteBuf buffer)
+   {
+      int initial = buffer.writerIndex();
+      // optimization by norman
+      int count;
+      while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0)
+      {
+         // Increment the writer index by the number of bytes written into it while calling recv.
+         buffer.writerIndex(buffer.writerIndex() + count);
+      }
+      return buffer.writerIndex() - initial;
+   }
+
+
+   public static MessageImpl decodeMessageImpl(ByteBuf buffer)
+   {
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
new file mode 100644
index 0000000..3a36f7d
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class FutureRunnable implements Runnable
+{
+   private final ReusableLatch latch;
+
+   public FutureRunnable(final int initialIterations)
+   {
+      latch = new ReusableLatch(initialIterations);
+   }
+
+   public FutureRunnable()
+   {
+      this(0);
+   }
+
+   public void run()
+   {
+      latch.countDown();
+   }
+
+   public void countUp()
+   {
+      latch.countUp();
+   }
+
+   public void countDown()
+   {
+      latch.countDown();
+   }
+
+   public int getCount()
+   {
+      return latch.getCount();
+   }
+
+   public void await() throws InterruptedException
+   {
+      latch.await();
+   }
+
+   public boolean await(long timeWait, TimeUnit timeUnit) throws InterruptedException
+   {
+      return latch.await(timeWait, timeUnit);
+   }
+
+   public boolean await(long milliseconds) throws InterruptedException
+   {
+      return latch.await(milliseconds);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
new file mode 100644
index 0000000..464bc2c
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+/**
+ * @author Clebert Suconic
+ */
+
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+/**
+ * This is to use NettyBuffer within Proton
+ *
+ * @author Clebert Suconic
+ */
+
+public class NettyWritable implements WritableBuffer
+{
+
+   final ByteBuf nettyBuffer;
+
+   public NettyWritable(ByteBuf nettyBuffer)
+   {
+      this.nettyBuffer = nettyBuffer;
+   }
+
+
+   @Override
+   public void put(byte b)
+   {
+      nettyBuffer.writeByte(b);
+   }
+
+   @Override
+   public void putFloat(float f)
+   {
+      nettyBuffer.writeFloat(f);
+   }
+
+   @Override
+   public void putDouble(double d)
+   {
+      nettyBuffer.writeDouble(d);
+   }
+
+   @Override
+   public void put(byte[] src, int offset, int length)
+   {
+      nettyBuffer.writeBytes(src, offset, length);
+   }
+
+   @Override
+   public void putShort(short s)
+   {
+      nettyBuffer.writeShort(s);
+   }
+
+   @Override
+   public void putInt(int i)
+   {
+      nettyBuffer.writeInt(i);
+   }
+
+   @Override
+   public void putLong(long l)
+   {
+      nettyBuffer.writeLong(l);
+   }
+
+   @Override
+   public boolean hasRemaining()
+   {
+      return nettyBuffer.writerIndex() < nettyBuffer.capacity();
+   }
+
+   @Override
+   public int remaining()
+   {
+      return nettyBuffer.capacity() - nettyBuffer.writerIndex();
+   }
+
+   @Override
+   public int position()
+   {
+      return nettyBuffer.writerIndex();
+   }
+
+   @Override
+   public void position(int position)
+   {
+      nettyBuffer.writerIndex(position);
+   }
+
+   @Override
+   public void put(ByteBuffer payload)
+   {
+      nettyBuffer.writeBytes(payload);
+   }
+
+   @Override
+   public int limit()
+   {
+      return nettyBuffer.capacity();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
new file mode 100644
index 0000000..ccebe3f
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java
@@ -0,0 +1,596 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.message.MessageError;
+import org.apache.qpid.proton.message.MessageFormat;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+/**
+ * This is a serverMessage that won't deal with the body
+ *
+ * @author Clebert Suconic
+ */
+public class ProtonServerMessage implements ProtonJMessage
+{
+   private Header header;
+   private DeliveryAnnotations deliveryAnnotations;
+   private MessageAnnotations messageAnnotations;
+   private Properties properties;
+   private ApplicationProperties applicationProperties;
+
+   // This should include a raw body of both footer and body
+   private byte[] rawBody;
+
+   private Section parsedBody;
+   private Footer parsedFooter;
+
+   private final int EOF = 0;
+
+   // TODO: Enumerations maybe?
+   private static final int HEADER_TYPE = 0x070;
+   private static final int DELIVERY_ANNOTATIONS = 0x071;
+   private static final int MESSAGE_ANNOTATIONS = 0x072;
+   private static final int PROPERTIES = 0x073;
+   private static final int APPLICATION_PROPERTIES = 0x074;
+
+
+   /**
+    * This will decode a ByteBuffer tha represents the entire message.
+    * Set the limits around the parameter.
+    *
+    * @param buffer a limited buffer for the message
+    */
+   public void decode(ByteBuffer buffer)
+   {
+
+      DecoderImpl decoder = CodecCache.getDecoder();
+
+
+      header = null;
+      deliveryAnnotations = null;
+      messageAnnotations = null;
+      properties = null;
+      applicationProperties = null;
+      rawBody = null;
+
+      decoder.setByteBuffer(buffer);
+      try
+      {
+         int type = readType(buffer, decoder);
+         if (type == HEADER_TYPE)
+         {
+            header = (Header) readSection(buffer, decoder);
+            type = readType(buffer, decoder);
+
+         }
+
+         if (type == DELIVERY_ANNOTATIONS)
+         {
+            deliveryAnnotations = (DeliveryAnnotations) readSection(buffer, decoder);
+            type = readType(buffer, decoder);
+
+         }
+
+         if (type == MESSAGE_ANNOTATIONS)
+         {
+            messageAnnotations = (MessageAnnotations) readSection(buffer, decoder);
+            type = readType(buffer, decoder);
+         }
+
+         if (type == PROPERTIES)
+         {
+            properties = (Properties) readSection(buffer, decoder);
+            type = readType(buffer, decoder);
+
+         }
+
+         if (type == APPLICATION_PROPERTIES)
+         {
+            applicationProperties = (ApplicationProperties) readSection(buffer, decoder);
+            type = readType(buffer, decoder);
+         }
+
+         if (type != EOF)
+         {
+            rawBody = new byte[buffer.limit() - buffer.position()];
+            buffer.get(rawBody);
+         }
+      }
+      finally
+      {
+         decoder.setByteBuffer(null);
+      }
+
+   }
+
+
+   public void encode(ByteBuffer buffer)
+   {
+      WritableBuffer writableBuffer = new WritableBuffer.ByteBufferWrapper(buffer);
+      encode(writableBuffer);
+   }
+
+   public int encode(WritableBuffer writableBuffer)
+   {
+      final int firstPosition = writableBuffer.position();
+
+      EncoderImpl encoder = CodecCache.getEncoder();
+      encoder.setByteBuffer(writableBuffer);
+
+      try
+      {
+         if (header != null)
+         {
+            encoder.writeObject(header);
+         }
+         if (deliveryAnnotations != null)
+         {
+            encoder.writeObject(deliveryAnnotations);
+         }
+         if (messageAnnotations != null)
+         {
+            encoder.writeObject(messageAnnotations);
+         }
+         if (properties != null)
+         {
+            encoder.writeObject(properties);
+         }
+         if (applicationProperties != null)
+         {
+            encoder.writeObject(applicationProperties);
+         }
+
+         // It should write either the parsed one or the rawBody
+         if (parsedBody != null)
+         {
+            encoder.writeObject(parsedBody);
+            if (parsedFooter != null)
+            {
+               encoder.writeObject(parsedFooter);
+            }
+         }
+         else if (rawBody != null)
+         {
+            writableBuffer.put(rawBody, 0, rawBody.length);
+         }
+
+         return writableBuffer.position() - firstPosition;
+      }
+      finally
+      {
+         encoder.setByteBuffer((WritableBuffer) null);
+      }
+   }
+
+
+   private int readType(ByteBuffer buffer, DecoderImpl decoder)
+   {
+
+      int pos = buffer.position();
+
+      if (!buffer.hasRemaining())
+      {
+         return EOF;
+      }
+      try
+      {
+         if (buffer.get() != 0)
+         {
+            return EOF;
+         }
+         else
+         {
+            return ((Number) decoder.readObject()).intValue();
+         }
+      }
+      finally
+      {
+         buffer.position(pos);
+      }
+   }
+
+
+   private Section readSection(ByteBuffer buffer, DecoderImpl decoder)
+   {
+      if (buffer.hasRemaining())
+      {
+         return (Section) decoder.readObject();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+
+   // At the moment we only need encode implemented!!!
+   @Override
+   public boolean isDurable()
+   {
+      return false;
+   }
+
+   @Override
+   public long getDeliveryCount()
+   {
+      return 0;
+   }
+
+   @Override
+   public short getPriority()
+   {
+      return 0;
+   }
+
+   @Override
+   public boolean isFirstAcquirer()
+   {
+      return false;
+   }
+
+   @Override
+   public long getTtl()
+   {
+      return 0;
+   }
+
+   @Override
+   public void setDurable(boolean durable)
+   {
+
+   }
+
+   @Override
+   public void setTtl(long ttl)
+   {
+
+   }
+
+   @Override
+   public void setDeliveryCount(long deliveryCount)
+   {
+
+   }
+
+   @Override
+   public void setFirstAcquirer(boolean firstAcquirer)
+   {
+
+   }
+
+   @Override
+   public void setPriority(short priority)
+   {
+
+   }
+
+   @Override
+   public Object getMessageId()
+   {
+      return null;
+   }
+
+   @Override
+   public long getGroupSequence()
+   {
+      return 0;
+   }
+
+   @Override
+   public String getReplyToGroupId()
+   {
+      return null;
+   }
+
+   @Override
+   public long getCreationTime()
+   {
+      return 0;
+   }
+
+   @Override
+   public String getAddress()
+   {
+      return null;
+   }
+
+   @Override
+   public byte[] getUserId()
+   {
+      return new byte[0];
+   }
+
+   @Override
+   public String getReplyTo()
+   {
+      return null;
+   }
+
+   @Override
+   public String getGroupId()
+   {
+      return null;
+   }
+
+   @Override
+   public String getContentType()
+   {
+      return null;
+   }
+
+   @Override
+   public long getExpiryTime()
+   {
+      return 0;
+   }
+
+   @Override
+   public Object getCorrelationId()
+   {
+      return null;
+   }
+
+   @Override
+   public String getContentEncoding()
+   {
+      return null;
+   }
+
+   @Override
+   public String getSubject()
+   {
+      return null;
+   }
+
+   @Override
+   public void setGroupSequence(long groupSequence)
+   {
+
+   }
+
+   @Override
+   public void setUserId(byte[] userId)
+   {
+
+   }
+
+   @Override
+   public void setCreationTime(long creationTime)
+   {
+
+   }
+
+   @Override
+   public void setSubject(String subject)
+   {
+
+   }
+
+   @Override
+   public void setGroupId(String groupId)
+   {
+
+   }
+
+   @Override
+   public void setAddress(String to)
+   {
+
+   }
+
+   @Override
+   public void setExpiryTime(long absoluteExpiryTime)
+   {
+
+   }
+
+   @Override
+   public void setReplyToGroupId(String replyToGroupId)
+   {
+
+   }
+
+   @Override
+   public void setContentEncoding(String contentEncoding)
+   {
+
+   }
+
+   @Override
+   public void setContentType(String contentType)
+   {
+
+   }
+
+   @Override
+   public void setReplyTo(String replyTo)
+   {
+
+   }
+
+   @Override
+   public void setCorrelationId(Object correlationId)
+   {
+
+   }
+
+   @Override
+   public void setMessageId(Object messageId)
+   {
+
+   }
+
+   @Override
+   public Header getHeader()
+   {
+      return null;
+   }
+
+   @Override
+   public DeliveryAnnotations getDeliveryAnnotations()
+   {
+      return null;
+   }
+
+   @Override
+   public MessageAnnotations getMessageAnnotations()
+   {
+      return null;
+   }
+
+   @Override
+   public Properties getProperties()
+   {
+      return null;
+   }
+
+   @Override
+   public ApplicationProperties getApplicationProperties()
+   {
+      return null;
+   }
+
+   @Override
+   public Section getBody()
+   {
+      return null;
+   }
+
+   @Override
+   public Footer getFooter()
+   {
+      return null;
+   }
+
+   @Override
+   public void setHeader(Header header)
+   {
+
+   }
+
+   @Override
+   public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations)
+   {
+
+   }
+
+   @Override
+   public void setMessageAnnotations(MessageAnnotations messageAnnotations)
+   {
+
+   }
+
+   @Override
+   public void setProperties(Properties properties)
+   {
+
+   }
+
+   @Override
+   public void setApplicationProperties(ApplicationProperties applicationProperties)
+   {
+
+   }
+
+   @Override
+   public void setBody(Section body)
+   {
+
+   }
+
+   @Override
+   public void setFooter(Footer footer)
+   {
+
+   }
+
+   @Override
+   public int decode(byte[] data, int offset, int length)
+   {
+      return 0;
+   }
+
+   @Override
+   public int encode(byte[] data, int offset, int length)
+   {
+      return 0;
+   }
+
+   @Override
+   public void load(Object data)
+   {
+
+   }
+
+   @Override
+   public Object save()
+   {
+      return null;
+   }
+
+   @Override
+   public String toAMQPFormat(Object value)
+   {
+      return null;
+   }
+
+   @Override
+   public Object parseAMQPFormat(String value)
+   {
+      return null;
+   }
+
+   @Override
+   public void setMessageFormat(MessageFormat format)
+   {
+
+   }
+
+   @Override
+   public MessageFormat getMessageFormat()
+   {
+      return null;
+   }
+
+   @Override
+   public void clear()
+   {
+
+   }
+
+   @Override
+   public MessageError getError()
+   {
+      return null;
+   }
+
+   @Override
+   public int encode2(byte[] data, int offset, int length)
+   {
+      return 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
new file mode 100644
index 0000000..4f1f01e
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * <p>This class will use the framework provided to by AbstractQueuedSynchronizer.</p>
+ * <p>AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.</p>
+ * <p/>
+ * <p>This class works just like CountDownLatch, with the difference you can also increase the counter</p>
+ * <p/>
+ * <p>It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)</p>
+ * <p/>
+ * <p>On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready handler (commit). At that point we just act as a regular countDown.</p>
+ * <p/>
+ * <p>Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.</p>
+ * <p/>
+ * <p>For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.</p>
+ *
+ * @author Clebert Suconic
+ */
+public class ReusableLatch
+{
+   /**
+    * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information
+    *
+    * @see AbstractQueuedSynchronizer
+    */
+   @SuppressWarnings("serial")
+   private static class CountSync extends AbstractQueuedSynchronizer
+   {
+      public CountSync(int count)
+      {
+         setState(count);
+      }
+
+      public int getCount()
+      {
+         return getState();
+      }
+
+      public void setCount(final int count)
+      {
+         setState(count);
+      }
+
+      @Override
+      public int tryAcquireShared(final int numberOfAqcquires)
+      {
+         return getState() == 0 ? 1 : -1;
+      }
+
+      public void add()
+      {
+         for (;;)
+         {
+            int actualState = getState();
+            int newState = actualState + 1;
+            if (compareAndSetState(actualState, newState))
+            {
+               return;
+            }
+         }
+      }
+
+      @Override
+      public boolean tryReleaseShared(final int numberOfReleases)
+      {
+         for (;;)
+         {
+            int actualState = getState();
+            if (actualState == 0)
+            {
+               return true;
+            }
+
+            int newState = actualState - numberOfReleases;
+
+            if (newState < 0)
+            {
+               newState = 0;
+            }
+
+            if (compareAndSetState(actualState, newState))
+            {
+               return newState == 0;
+            }
+         }
+      }
+   }
+
+   private final CountSync control;
+
+   public ReusableLatch()
+   {
+      this(0);
+   }
+
+   public ReusableLatch(final int count)
+   {
+      control = new CountSync(count);
+   }
+
+   public int getCount()
+   {
+      return control.getCount();
+   }
+
+   public void setCount(final int count)
+   {
+      control.setCount(count);
+   }
+
+   public void countUp()
+   {
+      control.add();
+   }
+
+   public void countDown()
+   {
+      control.releaseShared(1);
+   }
+
+
+   public void countDown(final int count)
+   {
+      control.releaseShared(count);
+   }
+
+   public void await() throws InterruptedException
+   {
+      control.acquireSharedInterruptibly(1);
+   }
+
+   public boolean await(final long milliseconds) throws InterruptedException
+   {
+      return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds));
+   }
+
+   public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException
+   {
+      return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait));
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
new file mode 100644
index 0000000..23c158f
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+import java.lang.ref.WeakReference;
+
+//import io.hawtjms.jms.JmsConnectionFactory;
+//import io.hawtjms.jms.JmsQueue;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.proton.plug.test.minimalserver.DumbServer;
+import org.proton.plug.test.minimalserver.MinimalServer;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class AbstractJMSTest
+{
+   protected final boolean useHawtJMS;
+   protected final boolean useSASL;
+
+   protected String address = "exampleQueue";
+   protected MinimalServer server = new MinimalServer();
+
+   public AbstractJMSTest(boolean useHawtJMS, boolean useSASL)
+   {
+      this.useHawtJMS = useHawtJMS;
+      this.useSASL = useSASL;
+   }
+
+   public void tearDown() throws Exception
+   {
+      server.stop();
+      DumbServer.clear();
+   }
+
+   public static void forceGC()
+   {
+      System.out.println("#test forceGC");
+      WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
+      // A loop that will wait GC, using the minimalserver time as possible
+      while (dumbReference.get() != null)
+      {
+         System.gc();
+         try
+         {
+            Thread.sleep(100);
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+      System.out.println("#test forceGC Done");
+   }
+
+
+   protected Connection createConnection() throws JMSException
+   {
+      final ConnectionFactory factory = createConnectionFactory();
+      final Connection connection = factory.createConnection();
+      connection.setExceptionListener(new ExceptionListener()
+      {
+         @Override
+         public void onException(JMSException exception)
+         {
+            exception.printStackTrace();
+         }
+      });
+      connection.start();
+      return connection;
+   }
+
+
+   protected ConnectionFactory createConnectionFactory()
+   {
+      if (useSASL)
+      {
+         if (useHawtJMS)
+         {
+//            return new JmsConnectionFactory("aaaaaaaa", "aaaaaaa", "amqp://localhost:" + Constants.PORT);
+            return null;
+         }
+         else
+         {
+            return new ConnectionFactoryImpl("localhost", Constants.PORT, "aaaaaaaa", "aaaaaaa");
+         }
+      }
+      else
+      {
+         if (useHawtJMS)
+         {
+//            return new JmsConnectionFactory("amqp://localhost:" + Constants.PORT);
+            return null;
+         }
+         else
+         {
+            return new ConnectionFactoryImpl("localhost", Constants.PORT, null, null);
+         }
+
+      }
+   }
+
+   protected Queue createQueue()
+   {
+      if (useHawtJMS)
+      {
+//         return new JmsQueue(address);
+         return null;
+      }
+      else
+      {
+         return new QueueImpl(address);
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java
new file mode 100644
index 0000000..27ffea4
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class Constants
+{
+   public static final int PORT = 5672;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
new file mode 100644
index 0000000..cd2638f
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.proton.plug.test;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.AMQPClientSenderContext;
+import org.proton.plug.AMQPClientSessionContext;
+import org.proton.plug.sasl.ClientSASLPlain;
+import org.proton.plug.test.minimalclient.SimpleAMQPConnector;
+import org.proton.plug.test.minimalserver.DumbServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.proton.plug.util.ByteUtil;
+
+/**
+ * This is simulating a JMS client against a simple server
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author Clebert Suconic
+ */
+@RunWith(Parameterized.class)
+public class ProtonTest extends AbstractJMSTest
+{
+
+   protected Connection connection;
+
+   @Parameterized.Parameters(name = "useHawt={0} sasl={1}")
+   public static Collection<Object[]> data()
+   {
+      List<Object[]> list = Arrays.asList(new Object[][]{
+         {Boolean.FALSE, Boolean.TRUE},
+         {Boolean.FALSE, Boolean.FALSE}});
+
+      System.out.println("Size = " + list.size());
+      return list;
+   }
+
+   public ProtonTest(boolean useHawtJMS, boolean useSASL)
+   {
+      super(useHawtJMS, useSASL);
+   }
+
+
+   @Before
+   public void setUp() throws Exception
+   {
+      DumbServer.clear();
+      AbstractJMSTest.forceGC();
+      server.start("127.0.0.1", Constants.PORT, true);
+      connection = createConnection();
+
+   }
+
+   @After
+   public void tearDown() throws Exception
+   {
+      if (connection != null)
+      {
+         connection.close();
+      }
+
+      super.tearDown();
+   }
+
+   @Test
+   public void testMessagesReceivedInParallel() throws Throwable
+   {
+      final int numMessages = getNumberOfMessages();
+      long time = System.currentTimeMillis();
+      final Queue queue = createQueue();
+
+      final ArrayList<Throwable> exceptions = new ArrayList<>();
+
+      Thread t = new Thread(new Runnable()
+      {
+         @Override
+         public void run()
+         {
+            Connection connectionConsumer = null;
+            try
+            {
+               connectionConsumer = createConnection();
+//               connectionConsumer = connection;
+               connectionConsumer.start();
+               Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+               final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+               int count = numMessages;
+               while (count > 0)
+               {
+                  try
+                  {
+                     BytesMessage m = (BytesMessage) consumer.receive(1000);
+                     if (count % 1000 == 0)
+                     {
+                        System.out.println("Count = " + count + ", property=" + m.getStringProperty("XX"));
+                     }
+                     Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m);
+                     count--;
+                  }
+                  catch (JMSException e)
+                  {
+                     break;
+                  }
+               }
+            }
+            catch (Throwable e)
+            {
+               exceptions.add(e);
+               e.printStackTrace();
+            }
+            finally
+            {
+               try
+               {
+                  // if the createconnecion wasn't commented out
+                  if (connectionConsumer != connection)
+                  {
+                     connectionConsumer.close();
+                  }
+               }
+               catch (Throwable ignored)
+               {
+                  // NO OP
+               }
+            }
+         }
+      });
+
+      Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+      t.start();
+
+      MessageProducer p = session.createProducer(queue);
+      p.setDeliveryMode(DeliveryMode.PERSISTENT);
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage message = session.createBytesMessage();
+         // TODO: this will break stuff if I use a large number
+         message.writeBytes(new byte[5]);
+         message.setIntProperty("count", i);
+         message.setStringProperty("XX", "count" + i);
+         p.send(message);
+      }
+
+      long taken = (System.currentTimeMillis() - time);
+      System.out.println("taken on send = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL);
+      t.join();
+
+      for (Throwable e : exceptions)
+      {
+         throw e;
+      }
+      taken = (System.currentTimeMillis() - time);
+      System.out.println("taken = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL);
+
+      connection.close();
+//      assertEquals(0, q.getMessageCount());
+   }
+
+
+   @Test
+   public void testSimpleCreateSessionAndClose() throws Throwable
+   {
+      final QueueImpl queue = new QueueImpl(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Thread.sleep(1000);
+      session.close();
+      connection.close();
+   }
+
+   @Test
+   public void testSimpleBinary() throws Throwable
+   {
+      final int numMessages = 5;
+      long time = System.currentTimeMillis();
+      final QueueImpl queue = new QueueImpl(address);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++)
+      {
+         bytes[i] = (byte) i;
+      }
+
+
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage message = session.createBytesMessage();
+
+         message.writeBytes(bytes);
+         message.setIntProperty("count", i);
+         p.send(message);
+      }
+
+
+      session.close();
+
+
+      Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage m = (BytesMessage) consumer.receive(5000);
+
+         System.out.println("length " + m.getBodyLength());
+         Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
+
+         m.reset();
+
+         long size = m.getBodyLength();
+         byte[] bytesReceived = new byte[(int) size];
+         m.readBytes(bytesReceived);
+
+
+         System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1));
+
+         Assert.assertArrayEquals(bytes, bytesReceived);
+      }
+
+//      assertEquals(0, q.getMessageCount());
+      long taken = (System.currentTimeMillis() - time) / 1000;
+      System.out.println("taken = " + taken);
+   }
+
+   @Test
+   public void testMapMessage() throws Exception
+   {
+      Queue queue = createQueue();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      for (int i = 0; i < 10; i++)
+      {
+         MapMessage message = session.createMapMessage();
+         message.setInt("x", i);
+         message.setString("str", "str" + i);
+         p.send(message);
+      }
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      for (int i = 0; i < 10; i++)
+      {
+         MapMessage m = (MapMessage) messageConsumer.receive(5000);
+         Assert.assertNotNull(m);
+         Assert.assertEquals(i, m.getInt("x"));
+         Assert.assertEquals("str" + i, m.getString("str"));
+      }
+
+      Assert.assertNull(messageConsumer.receiveNoWait());
+   }
+
+   @Test
+   public void testProperties() throws Exception
+   {
+      Queue queue = createQueue();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(queue);
+      TextMessage message = session.createTextMessage();
+      message.setText("msg:0");
+      message.setBooleanProperty("true", true);
+      message.setBooleanProperty("false", false);
+      message.setStringProperty("foo", "bar");
+      message.setDoubleProperty("double", 66.6);
+      message.setFloatProperty("float", 56.789f);
+      message.setIntProperty("int", 8);
+      message.setByteProperty("byte", (byte) 10);
+      p.send(message);
+      p.send(message);
+      connection.start();
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      TextMessage m = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      Assert.assertEquals("msg:0", m.getText());
+      Assert.assertEquals(m.getBooleanProperty("true"), true);
+      Assert.assertEquals(m.getBooleanProperty("false"), false);
+      Assert.assertEquals(m.getStringProperty("foo"), "bar");
+      Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001);
+      Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001);
+      Assert.assertEquals(m.getIntProperty("int"), 8);
+      Assert.assertEquals(m.getByteProperty("byte"), (byte) 10);
+      m = (TextMessage) messageConsumer.receive(5000);
+      Assert.assertNotNull(m);
+      connection.close();
+   }
+
+   //   @Test
+   public void testSendWithSimpleClient() throws Exception
+   {
+      SimpleAMQPConnector connector = new SimpleAMQPConnector();
+      connector.start();
+      AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT);
+
+      clientConnection.clientOpen(new ClientSASLPlain("aa", "aa"));
+
+      AMQPClientSessionContext session = clientConnection.createClientSession();
+      AMQPClientSenderContext clientSender = session.createSender(address, true);
+
+
+      Properties props = new Properties();
+      for (int i = 0; i < 1; i++)
+      {
+         MessageImpl message = (MessageImpl) Message.Factory.create();
+
+         HashMap map = new HashMap();
+
+         map.put("i", i);
+         AmqpValue value = new AmqpValue(map);
+         message.setBody(value);
+         message.setProperties(props);
+         clientSender.send(message);
+      }
+
+      Session clientSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection.start();
+
+      MessageConsumer consumer = clientSession.createConsumer(createQueue());
+      for (int i = 0; i < 1; i++)
+      {
+         MapMessage msg = (MapMessage) consumer.receive(5000);
+         System.out.println("Msg " + msg);
+         Assert.assertNotNull(msg);
+
+         System.out.println("Receive message " + i);
+
+         Assert.assertEquals(0, msg.getInt("i"));
+      }
+   }
+
+
+   protected int getNumberOfMessages()
+   {
+      return 10000;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
new file mode 100644
index 0000000..90ecee3
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.invm;
+
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.context.client.ProtonClientConnectionContext;
+import org.proton.plug.test.minimalclient.Connector;
+
+/**
+ * This is used for testing, where we bypass Netty or any networking for test conditions only
+ *
+ * @author Clebert Suconic
+ */
+public class InVMTestConnector implements Connector
+{
+   @Override
+   public void start()
+   {
+
+   }
+
+   @Override
+   public AMQPClientConnectionContext connect(String host, int port) throws Exception
+   {
+      return new ProtonClientConnectionContext(new ProtonINVMSPI());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
new file mode 100644
index 0000000..45c0703
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.invm;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.netty.buffer.ByteBuf;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.context.server.ProtonServerConnectionContext;
+import org.proton.plug.sasl.AnonymousServerSASL;
+import org.proton.plug.sasl.ServerSASLPlain;
+import org.proton.plug.test.minimalserver.MinimalSessionSPI;
+import org.proton.plug.util.ByteUtil;
+import org.proton.plug.util.DebugInfo;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonINVMSPI implements AMQPConnectionCallback
+{
+
+   AMQPConnectionContext returningConnection;
+
+   ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI());
+
+   final ExecutorService mainExecutor = Executors.newSingleThreadExecutor();
+
+   final ExecutorService returningExecutor = Executors.newSingleThreadExecutor();
+
+   public ProtonINVMSPI()
+   {
+      mainExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            Thread.currentThread().setName("MainExecutor-INVM");
+         }
+      });
+      returningExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            Thread.currentThread().setName("ReturningExecutor-INVM");
+         }
+      });
+   }
+
+   @Override
+   public void close()
+   {
+      mainExecutor.shutdown();
+   }
+
+   @Override
+   public ServerSASL[] getSASLMechnisms()
+   {
+      return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
+   }
+
+
+   @Override
+   public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
+   {
+      if (DebugInfo.debug)
+      {
+         ByteUtil.debugFrame("InVM->", bytes);
+      }
+      final int size = bytes.writerIndex();
+
+      bytes.retain();
+      mainExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            try
+            {
+               if (DebugInfo.debug)
+               {
+                  ByteUtil.debugFrame("InVMDone->", bytes);
+               }
+               serverConnection.inputBuffer(bytes);
+               try
+               {
+                  connection.outputDone(size);
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+            finally
+            {
+               bytes.release();
+            }
+         }
+      });
+   }
+
+   @Override
+   public void setConnection(AMQPConnectionContext connection)
+   {
+      returningConnection = connection;
+   }
+
+   @Override
+   public AMQPConnectionContext getConnection()
+   {
+      return returningConnection;
+   }
+
+   @Override
+   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection)
+   {
+      return null;
+   }
+
+   class ReturnSPI implements AMQPConnectionCallback
+   {
+      @Override
+      public void close()
+      {
+
+      }
+
+      @Override
+      public ServerSASL[] getSASLMechnisms()
+      {
+         return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
+      }
+
+
+      @Override
+      public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
+      {
+
+         final int size = bytes.writerIndex();
+         if (DebugInfo.debug)
+         {
+            ByteUtil.debugFrame("InVM<-", bytes);
+         }
+
+
+         bytes.retain();
+         returningExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+
+                  if (DebugInfo.debug)
+                  {
+                     ByteUtil.debugFrame("InVM done<-", bytes);
+                  }
+
+                  returningConnection.inputBuffer(bytes);
+                  try
+                  {
+                     connection.outputDone(size);
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  }
+
+               }
+               finally
+               {
+                  bytes.release();
+               }
+            }
+         });
+      }
+
+      @Override
+      public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection)
+      {
+         return new MinimalSessionSPI();
+      }
+
+      @Override
+      public void setConnection(AMQPConnectionContext connection)
+      {
+
+      }
+
+      @Override
+      public AMQPConnectionContext getConnection()
+      {
+         return null;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
new file mode 100644
index 0000000..c577fb0
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalclient;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.sasl.AnonymousServerSASL;
+import org.proton.plug.sasl.ServerSASLPlain;
+import org.proton.plug.util.ByteUtil;
+import org.proton.plug.util.DebugInfo;
+import org.proton.plug.util.ReusableLatch;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class AMQPClientSPI implements AMQPConnectionCallback
+{
+
+   final Channel channel;
+   protected AMQPConnectionContext connection;
+
+   public AMQPClientSPI(Channel channel)
+   {
+      this.channel = channel;
+   }
+
+   public void setConnection(AMQPConnectionContext connection)
+   {
+      this.connection = connection;
+   }
+
+   public AMQPConnectionContext getConnection()
+   {
+      return connection;
+   }
+
+   @Override
+   public void close()
+   {
+
+   }
+
+   @Override
+   public ServerSASL[] getSASLMechnisms()
+   {
+      return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
+   }
+
+
+   final ReusableLatch latch = new ReusableLatch(0);
+
+   @Override
+   public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
+   {
+      if (DebugInfo.debug)
+      {
+         ByteUtil.debugFrame("Bytes leaving client", bytes);
+      }
+
+      final int bufferSize = bytes.writerIndex();
+
+
+      latch.countUp();
+
+      channel.writeAndFlush(bytes).addListener(new ChannelFutureListener()
+      {
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception
+         {
+              //
+//            connection.outputDone(bufferSize);
+            latch.countDown();
+         }
+      });
+
+      if (connection.isSyncOnFlush())
+      {
+         try
+         {
+            if (!latch.await(5, TimeUnit.SECONDS))
+            {
+               // TODO logs
+               System.err.println("Flush took longer than 5 seconds!!!");
+            }
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+         }
+      }
+
+      connection.outputDone(bufferSize);
+
+   }
+
+   @Override
+   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection)
+   {
+      return null;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
new file mode 100644
index 0000000..e653b37
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalclient;
+
+import org.proton.plug.AMQPClientConnectionContext;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public interface Connector
+{
+   void start();
+
+   AMQPClientConnectionContext connect(String host, int port) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
new file mode 100644
index 0000000..0d69f3d
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalclient;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.proton.plug.AMQPClientConnectionContext;
+import org.proton.plug.context.client.ProtonClientConnectionContextFactory;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class SimpleAMQPConnector implements Connector
+{
+   private Bootstrap bootstrap;
+
+   public void start()
+   {
+
+      bootstrap = new Bootstrap();
+      bootstrap.channel(NioSocketChannel.class);
+      bootstrap.group(new NioEventLoopGroup(10));
+
+      bootstrap.handler(
+         new ChannelInitializer<Channel>()
+         {
+            public void initChannel(Channel channel) throws Exception
+            {
+            }
+         }
+      );
+   }
+
+   public AMQPClientConnectionContext connect(String host, int port) throws Exception
+   {
+      SocketAddress remoteDestination = new InetSocketAddress(host, port);
+
+      ChannelFuture future = bootstrap.connect(remoteDestination);
+
+      future.awaitUninterruptibly();
+
+      AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel());
+
+      final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI);
+
+      future.channel().pipeline().addLast(
+         new ChannelDuplexHandler()
+         {
+
+            @Override
+            public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+            {
+               ByteBuf buffer = (ByteBuf) msg;
+               connection.inputBuffer(buffer);
+            }
+         }
+      );
+
+
+      return connection;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
new file mode 100644
index 0000000..d5c2d4f
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalserver;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class DumbServer
+{
+   static ConcurrentHashMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>();
+
+   public static BlockingDeque getQueue(String name)
+   {
+      BlockingDeque q = maps.get(name);
+      if (q == null)
+      {
+         q = new LinkedBlockingDeque();
+         BlockingDeque oldValue = maps.putIfAbsent(name, q);
+         if (oldValue != null)
+         {
+            q = oldValue;
+         }
+      }
+      return q;
+   }
+
+   public static void clear()
+   {
+      for (BlockingDeque<Object> queue : maps.values())
+      {
+         // We clear the queues just in case there is a component holding it
+         queue.clear();
+      }
+      maps.clear();
+   }
+
+   public static void put(String queue, Object message)
+   {
+      getQueue(queue).add(message);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
new file mode 100644
index 0000000..9a6563d
--- /dev/null
+++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.proton.plug.test.minimalserver;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.sasl.AnonymousServerSASL;
+import org.proton.plug.sasl.ServerSASLPlain;
+import org.proton.plug.util.ByteUtil;
+import org.proton.plug.util.DebugInfo;
+import org.proton.plug.util.ReusableLatch;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class MinimalConnectionSPI implements AMQPConnectionCallback
+{
+   Channel channel;
+
+   private AMQPConnectionContext connection;
+
+   public MinimalConnectionSPI(Channel channel)
+   {
+      this.channel = channel;
+   }
+
+   ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+   @Override
+   public void close()
+   {
+      executorService.shutdown();
+   }
+
+   public void setConnection(AMQPConnectionContext connection)
+   {
+      this.connection = connection;
+   }
+
+   public AMQPConnectionContext getConnection()
+   {
+      return connection;
+   }
+
+   final ReusableLatch latch = new ReusableLatch(0);
+
+   @Override
+   public ServerSASL[] getSASLMechnisms()
+   {
+      return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()};
+   }
+
+   @Override
+   public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection)
+   {
+      final int bufferSize = bytes.writerIndex();
+
+      if (DebugInfo.debug)
+      {
+         // some debug
+         byte[] frame = new byte[bytes.writerIndex()];
+         int readerOriginalPos = bytes.readerIndex();
+
+         bytes.getBytes(0, frame);
+
+         try
+         {
+            System.err.println("Buffer Outgoing: " + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 4, 16));
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+
+         bytes.readerIndex(readerOriginalPos);
+      }
+
+
+      latch.countUp();
+      // ^^ debug
+
+      channel.writeAndFlush(bytes).addListener(new ChannelFutureListener()
+      {
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception
+         {
+            latch.countDown();
+
+            //   https://issues.apache.org/jira/browse/PROTON-645
+//            connection.outputDone(bufferSize);
+//            if (connection.capacity() > 0)
+//            {
+//               channel.read();
+//            }
+         }
+      });
+
+      channel.flush();
+
+      if (connection.isSyncOnFlush())
+      {
+         try
+         {
+            if (!latch.await(5, TimeUnit.SECONDS))
+            {
+               // TODO logs
+               System.err.println("Flush took longer than 5 seconds!!!");
+            }
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+         }
+      }
+      connection.outputDone(bufferSize);
+
+
+//      if (connection.capacity() > 0)
+//      {
+//         channel.read();
+//      }
+   }
+
+   @Override
+   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection)
+   {
+      return new MinimalSessionSPI();
+   }
+}


Mime
View raw message