Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 201121754B for ; Tue, 11 Nov 2014 11:00:35 +0000 (UTC) Received: (qmail 67422 invoked by uid 500); 11 Nov 2014 11:00:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 67338 invoked by uid 500); 11 Nov 2014 11:00:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 66176 invoked by uid 99); 11 Nov 2014 11:00:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 11:00:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 175E39ABD01; Tue, 11 Nov 2014 11:00:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 11:00:59 -0000 Message-Id: In-Reply-To: <8993358cfa3a48d182aa897a718a8a96@git.apache.org> References: <8993358cfa3a48d182aa897a718a8a96@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java new file mode 100644 index 0000000..781a11c --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ClientSASLPlain.java @@ -0,0 +1,62 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.sasl; + +import org.proton.plug.ClientSASL; + +/** + * This is a simple implementation provided with just user/password + * TODO: this interface will probaby change as we are challenged with more SASL cases where there is a communication between client and server to determine the authentication + * + * @author Clebert Suconic + */ +public class ClientSASLPlain implements ClientSASL +{ + private String username; + private String password; + + public ClientSASLPlain(String user, String password) + { + this.username = user; + this.password = password; + } + + public String getName() + { + return "PLAIN"; + } + + public byte[] getBytes() + { + + if (username == null) + { + username = ""; + } + + if (password == null) + { + password = ""; + } + + byte[] usernameBytes = username.getBytes(); + byte[] passwordBytes = password.getBytes(); + byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2]; + System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); + System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); + return data; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java new file mode 100644 index 0000000..85e9aac --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/PlainSASLResult.java @@ -0,0 +1,51 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.sasl; + +import org.proton.plug.SASLResult; + +/** + * @author Clebert Suconic + */ + +public class PlainSASLResult implements SASLResult +{ + private boolean success; + private String user; + private String password; + + public PlainSASLResult(boolean success, String user, String password) + { + this.success = success; + this.user = user; + this.password = password; + } + + @Override + public String getUser() + { + return user; + } + + public String getPassword() + { + return password; + } + + @Override + public boolean isSuccess() + { + return success; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java new file mode 100644 index 0000000..490c89c --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/ServerSASLPlain.java @@ -0,0 +1,75 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.sasl; + +import org.proton.plug.SASLResult; +import org.proton.plug.ServerSASL; + +/** + * @author Clebert Suconic + */ + +public class ServerSASLPlain implements ServerSASL +{ + public static final String NAME = "PLAIN"; + + @Override + public String getName() + { + return NAME; + } + + @Override + public SASLResult processSASL(byte[] data) + { + + String username = null; + String password = null; + String bytes = new String(data); + String[] credentials = bytes.split(Character.toString((char) 0)); + int offSet = 0; + if (credentials.length > 0) + { + if (credentials[0].length() == 0) + { + offSet = 1; + } + + if (credentials.length >= offSet) + { + username = credentials[offSet]; + } + if (credentials.length >= (offSet + 1)) + { + password = credentials[offSet + 1]; + } + } + + boolean success = authenticate(username, password); + + return new PlainSASLResult(success, username, password); + } + + + /** + * Hook for subclasses to perform the authentication here + * + * @param user + * @param password + */ + protected boolean authenticate(String user, String password) + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java new file mode 100644 index 0000000..7d12f31 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ByteUtil.java @@ -0,0 +1,152 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; + +/** + * @author Clebert Suconic + */ + +public class ByteUtil +{ + public static void debugFrame(String message, ByteBuf byteIn) + { + int location = byteIn.readerIndex(); + // debugging + byte[] frame = new byte[byteIn.writerIndex()]; + byteIn.readBytes(frame); + + try + { + System.out.println(message + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 8, 16)); + } + catch (Exception e) + { + e.printStackTrace(); + } + + byteIn.readerIndex(location); + } + + + public static String formatGroup(String str, int groupSize, int lineBreak) + { + StringBuffer buffer = new StringBuffer(); + + int line = 1; + buffer.append("/* 1 */ \""); + for (int i = 0; i < str.length(); i += groupSize) + { + buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize))); + + if ((i + groupSize) % lineBreak == 0) + { + buffer.append("\" +\n/* "); + line++; + if (line < 10) + { + buffer.append(" "); + } + buffer.append(Integer.toString(line) + " */ \""); + } + else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) + { + buffer.append("\" + \""); + } + } + + buffer.append("\";"); + + return buffer.toString(); + + } + + protected static final char[] hexArray = "0123456789ABCDEF".toCharArray(); + + public static String bytesToHex(byte[] bytes) + { + char[] hexChars = new char[bytes.length * 2]; + for (int j = 0; j < bytes.length; j++) + { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + public static byte[] hexStringToByteArray(String s) + { + int len = s.length(); + byte[] data = new byte[len / 2]; + for (int i = 0; i < len; i += 2) + { + data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + + Character.digit(s.charAt(i + 1), 16)); + } + return data; + } + + public static byte[] longToBytes(long x) + { + ByteBuf buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8); + buffer.writeLong(x); + return buffer.array(); + } + + public static String maxString(String value, int size) + { + if (value.length() < size) + { + return value; + } + else + { + return value.substring(0, size / 2) + " ... " + value.substring(value.length() - size / 2); + } + } + + public static String bytesToHex(byte[] bytes, int groupSize) + { + char[] hexChars = new char[bytes.length * 2 + numberOfGroups(bytes, groupSize)]; + int outPos = 0; + for (int j = 0; j < bytes.length; j++) + { + if (j > 0 && j % groupSize == 0) + { + hexChars[outPos++] = ' '; + } + int v = bytes[j] & 0xFF; + hexChars[outPos++] = hexArray[v >>> 4]; + hexChars[outPos++] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + private static int numberOfGroups(byte[] bytes, int groupSize) + { + int groups = bytes.length / groupSize; + + if (bytes.length % groupSize == 0) + { + groups--; + } + + return groups; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java new file mode 100644 index 0000000..bb10c5a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CodecCache.java @@ -0,0 +1,56 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; + +/** + * @author Clebert Suconic + */ + +public class CodecCache +{ + + private static class EncoderDecoderPair + { + DecoderImpl decoder = new DecoderImpl(); + EncoderImpl encoder = new EncoderImpl(decoder); + + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + } + } + + private static final ThreadLocal tlsCodec = new ThreadLocal() + { + @Override + protected EncoderDecoderPair initialValue() + { + return new EncoderDecoderPair(); + } + }; + + public static DecoderImpl getDecoder() + { + return tlsCodec.get().decoder; + } + + public static EncoderImpl getEncoder() + { + return tlsCodec.get().encoder; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java new file mode 100644 index 0000000..36deee2 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/CreditsSemaphore.java @@ -0,0 +1,134 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + * @author Clebert Suconic + */ + +public class CreditsSemaphore +{ + + @SuppressWarnings("serial") + private static class Sync extends AbstractQueuedSynchronizer + { + public Sync(int initial) + { + setState(initial); + } + + public int getCredits() + { + return getState(); + } + + @Override + public int tryAcquireShared(final int numberOfAqcquires) + { + for (;;) + { + int actualSize = getState(); + int newValue = actualSize - numberOfAqcquires; + + if (newValue < 0) + { + if (actualSize == getState()) + { + return -1; + } + } + else if (compareAndSetState(actualSize, newValue)) + { + return newValue; + } + } + } + + @Override + public boolean tryReleaseShared(final int numberOfReleases) + { + for (;;) + { + int actualSize = getState(); + int newValue = actualSize + numberOfReleases; + + if (compareAndSetState(actualSize, newValue)) + { + return true; + } + + } + } + + public void setCredits(final int credits) + { + for (;;) + { + int actualState = getState(); + if (compareAndSetState(actualState, credits)) + { + // This is to wake up any pending threads that could be waiting on queued + releaseShared(0); + return; + } + } + } + } + + private final Sync sync; + + + public CreditsSemaphore(int initialCredits) + { + sync = new Sync(initialCredits); + } + + public void acquire() throws InterruptedException + { + sync.acquireSharedInterruptibly(1); + } + + public boolean tryAcquire() + { + return sync.tryAcquireShared(1) >= 0; + } + + public void release() throws InterruptedException + { + sync.releaseShared(1); + } + + public void release(int credits) throws InterruptedException + { + sync.releaseShared(credits); + } + + public void setCredits(int credits) + { + sync.setCredits(credits); + } + + public int getCredits() + { + return sync.getCredits(); + } + + public boolean hasQueuedThreads() + { + return sync.hasQueuedThreads(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java new file mode 100644 index 0000000..6481413 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DebugInfo.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +/** + * @author Clebert Suconic + */ + +public class DebugInfo +{ + public static final boolean debug = false; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java new file mode 100644 index 0000000..7430c44 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/DeliveryUtil.java @@ -0,0 +1,49 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; + +/** + * @author Clebert Suconic + */ + +public class DeliveryUtil +{ + + public static int readDelivery(Receiver receiver, ByteBuf buffer) + { + int initial = buffer.writerIndex(); + // optimization by norman + int count; + while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) + { + // Increment the writer index by the number of bytes written into it while calling recv. + buffer.writerIndex(buffer.writerIndex() + count); + } + return buffer.writerIndex() - initial; + } + + + public static MessageImpl decodeMessageImpl(ByteBuf buffer) + { + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + return message; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java new file mode 100644 index 0000000..3a36f7d --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/FutureRunnable.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import java.util.concurrent.TimeUnit; + +/** + * @author Clebert Suconic + */ + +public class FutureRunnable implements Runnable +{ + private final ReusableLatch latch; + + public FutureRunnable(final int initialIterations) + { + latch = new ReusableLatch(initialIterations); + } + + public FutureRunnable() + { + this(0); + } + + public void run() + { + latch.countDown(); + } + + public void countUp() + { + latch.countUp(); + } + + public void countDown() + { + latch.countDown(); + } + + public int getCount() + { + return latch.getCount(); + } + + public void await() throws InterruptedException + { + latch.await(); + } + + public boolean await(long timeWait, TimeUnit timeUnit) throws InterruptedException + { + return latch.await(timeWait, timeUnit); + } + + public boolean await(long milliseconds) throws InterruptedException + { + return latch.await(milliseconds); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java new file mode 100644 index 0000000..464bc2c --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/NettyWritable.java @@ -0,0 +1,120 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +/** + * @author Clebert Suconic + */ + + +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.codec.WritableBuffer; + +/** + * This is to use NettyBuffer within Proton + * + * @author Clebert Suconic + */ + +public class NettyWritable implements WritableBuffer +{ + + final ByteBuf nettyBuffer; + + public NettyWritable(ByteBuf nettyBuffer) + { + this.nettyBuffer = nettyBuffer; + } + + + @Override + public void put(byte b) + { + nettyBuffer.writeByte(b); + } + + @Override + public void putFloat(float f) + { + nettyBuffer.writeFloat(f); + } + + @Override + public void putDouble(double d) + { + nettyBuffer.writeDouble(d); + } + + @Override + public void put(byte[] src, int offset, int length) + { + nettyBuffer.writeBytes(src, offset, length); + } + + @Override + public void putShort(short s) + { + nettyBuffer.writeShort(s); + } + + @Override + public void putInt(int i) + { + nettyBuffer.writeInt(i); + } + + @Override + public void putLong(long l) + { + nettyBuffer.writeLong(l); + } + + @Override + public boolean hasRemaining() + { + return nettyBuffer.writerIndex() < nettyBuffer.capacity(); + } + + @Override + public int remaining() + { + return nettyBuffer.capacity() - nettyBuffer.writerIndex(); + } + + @Override + public int position() + { + return nettyBuffer.writerIndex(); + } + + @Override + public void position(int position) + { + nettyBuffer.writerIndex(position); + } + + @Override + public void put(ByteBuffer payload) + { + nettyBuffer.writeBytes(payload); + } + + @Override + public int limit() + { + return nettyBuffer.capacity(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java new file mode 100644 index 0000000..ccebe3f --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java @@ -0,0 +1,596 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.util; + +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.message.MessageError; +import org.apache.qpid.proton.message.MessageFormat; +import org.apache.qpid.proton.message.ProtonJMessage; + +/** + * This is a serverMessage that won't deal with the body + * + * @author Clebert Suconic + */ +public class ProtonServerMessage implements ProtonJMessage +{ + private Header header; + private DeliveryAnnotations deliveryAnnotations; + private MessageAnnotations messageAnnotations; + private Properties properties; + private ApplicationProperties applicationProperties; + + // This should include a raw body of both footer and body + private byte[] rawBody; + + private Section parsedBody; + private Footer parsedFooter; + + private final int EOF = 0; + + // TODO: Enumerations maybe? + private static final int HEADER_TYPE = 0x070; + private static final int DELIVERY_ANNOTATIONS = 0x071; + private static final int MESSAGE_ANNOTATIONS = 0x072; + private static final int PROPERTIES = 0x073; + private static final int APPLICATION_PROPERTIES = 0x074; + + + /** + * This will decode a ByteBuffer tha represents the entire message. + * Set the limits around the parameter. + * + * @param buffer a limited buffer for the message + */ + public void decode(ByteBuffer buffer) + { + + DecoderImpl decoder = CodecCache.getDecoder(); + + + header = null; + deliveryAnnotations = null; + messageAnnotations = null; + properties = null; + applicationProperties = null; + rawBody = null; + + decoder.setByteBuffer(buffer); + try + { + int type = readType(buffer, decoder); + if (type == HEADER_TYPE) + { + header = (Header) readSection(buffer, decoder); + type = readType(buffer, decoder); + + } + + if (type == DELIVERY_ANNOTATIONS) + { + deliveryAnnotations = (DeliveryAnnotations) readSection(buffer, decoder); + type = readType(buffer, decoder); + + } + + if (type == MESSAGE_ANNOTATIONS) + { + messageAnnotations = (MessageAnnotations) readSection(buffer, decoder); + type = readType(buffer, decoder); + } + + if (type == PROPERTIES) + { + properties = (Properties) readSection(buffer, decoder); + type = readType(buffer, decoder); + + } + + if (type == APPLICATION_PROPERTIES) + { + applicationProperties = (ApplicationProperties) readSection(buffer, decoder); + type = readType(buffer, decoder); + } + + if (type != EOF) + { + rawBody = new byte[buffer.limit() - buffer.position()]; + buffer.get(rawBody); + } + } + finally + { + decoder.setByteBuffer(null); + } + + } + + + public void encode(ByteBuffer buffer) + { + WritableBuffer writableBuffer = new WritableBuffer.ByteBufferWrapper(buffer); + encode(writableBuffer); + } + + public int encode(WritableBuffer writableBuffer) + { + final int firstPosition = writableBuffer.position(); + + EncoderImpl encoder = CodecCache.getEncoder(); + encoder.setByteBuffer(writableBuffer); + + try + { + if (header != null) + { + encoder.writeObject(header); + } + if (deliveryAnnotations != null) + { + encoder.writeObject(deliveryAnnotations); + } + if (messageAnnotations != null) + { + encoder.writeObject(messageAnnotations); + } + if (properties != null) + { + encoder.writeObject(properties); + } + if (applicationProperties != null) + { + encoder.writeObject(applicationProperties); + } + + // It should write either the parsed one or the rawBody + if (parsedBody != null) + { + encoder.writeObject(parsedBody); + if (parsedFooter != null) + { + encoder.writeObject(parsedFooter); + } + } + else if (rawBody != null) + { + writableBuffer.put(rawBody, 0, rawBody.length); + } + + return writableBuffer.position() - firstPosition; + } + finally + { + encoder.setByteBuffer((WritableBuffer) null); + } + } + + + private int readType(ByteBuffer buffer, DecoderImpl decoder) + { + + int pos = buffer.position(); + + if (!buffer.hasRemaining()) + { + return EOF; + } + try + { + if (buffer.get() != 0) + { + return EOF; + } + else + { + return ((Number) decoder.readObject()).intValue(); + } + } + finally + { + buffer.position(pos); + } + } + + + private Section readSection(ByteBuffer buffer, DecoderImpl decoder) + { + if (buffer.hasRemaining()) + { + return (Section) decoder.readObject(); + } + else + { + return null; + } + } + + + // At the moment we only need encode implemented!!! + @Override + public boolean isDurable() + { + return false; + } + + @Override + public long getDeliveryCount() + { + return 0; + } + + @Override + public short getPriority() + { + return 0; + } + + @Override + public boolean isFirstAcquirer() + { + return false; + } + + @Override + public long getTtl() + { + return 0; + } + + @Override + public void setDurable(boolean durable) + { + + } + + @Override + public void setTtl(long ttl) + { + + } + + @Override + public void setDeliveryCount(long deliveryCount) + { + + } + + @Override + public void setFirstAcquirer(boolean firstAcquirer) + { + + } + + @Override + public void setPriority(short priority) + { + + } + + @Override + public Object getMessageId() + { + return null; + } + + @Override + public long getGroupSequence() + { + return 0; + } + + @Override + public String getReplyToGroupId() + { + return null; + } + + @Override + public long getCreationTime() + { + return 0; + } + + @Override + public String getAddress() + { + return null; + } + + @Override + public byte[] getUserId() + { + return new byte[0]; + } + + @Override + public String getReplyTo() + { + return null; + } + + @Override + public String getGroupId() + { + return null; + } + + @Override + public String getContentType() + { + return null; + } + + @Override + public long getExpiryTime() + { + return 0; + } + + @Override + public Object getCorrelationId() + { + return null; + } + + @Override + public String getContentEncoding() + { + return null; + } + + @Override + public String getSubject() + { + return null; + } + + @Override + public void setGroupSequence(long groupSequence) + { + + } + + @Override + public void setUserId(byte[] userId) + { + + } + + @Override + public void setCreationTime(long creationTime) + { + + } + + @Override + public void setSubject(String subject) + { + + } + + @Override + public void setGroupId(String groupId) + { + + } + + @Override + public void setAddress(String to) + { + + } + + @Override + public void setExpiryTime(long absoluteExpiryTime) + { + + } + + @Override + public void setReplyToGroupId(String replyToGroupId) + { + + } + + @Override + public void setContentEncoding(String contentEncoding) + { + + } + + @Override + public void setContentType(String contentType) + { + + } + + @Override + public void setReplyTo(String replyTo) + { + + } + + @Override + public void setCorrelationId(Object correlationId) + { + + } + + @Override + public void setMessageId(Object messageId) + { + + } + + @Override + public Header getHeader() + { + return null; + } + + @Override + public DeliveryAnnotations getDeliveryAnnotations() + { + return null; + } + + @Override + public MessageAnnotations getMessageAnnotations() + { + return null; + } + + @Override + public Properties getProperties() + { + return null; + } + + @Override + public ApplicationProperties getApplicationProperties() + { + return null; + } + + @Override + public Section getBody() + { + return null; + } + + @Override + public Footer getFooter() + { + return null; + } + + @Override + public void setHeader(Header header) + { + + } + + @Override + public void setDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations) + { + + } + + @Override + public void setMessageAnnotations(MessageAnnotations messageAnnotations) + { + + } + + @Override + public void setProperties(Properties properties) + { + + } + + @Override + public void setApplicationProperties(ApplicationProperties applicationProperties) + { + + } + + @Override + public void setBody(Section body) + { + + } + + @Override + public void setFooter(Footer footer) + { + + } + + @Override + public int decode(byte[] data, int offset, int length) + { + return 0; + } + + @Override + public int encode(byte[] data, int offset, int length) + { + return 0; + } + + @Override + public void load(Object data) + { + + } + + @Override + public Object save() + { + return null; + } + + @Override + public String toAMQPFormat(Object value) + { + return null; + } + + @Override + public Object parseAMQPFormat(String value) + { + return null; + } + + @Override + public void setMessageFormat(MessageFormat format) + { + + } + + @Override + public MessageFormat getMessageFormat() + { + return null; + } + + @Override + public void clear() + { + + } + + @Override + public MessageError getError() + { + return null; + } + + @Override + public int encode2(byte[] data, int offset, int length) + { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java new file mode 100644 index 0000000..4f1f01e --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/util/ReusableLatch.java @@ -0,0 +1,156 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +/** + *

This class will use the framework provided to by AbstractQueuedSynchronizer.

+ *

AbstractQueuedSynchronizer is the framework for any sort of concurrent synchronization, such as Semaphores, events, etc, based on AtomicIntegers.

+ *

+ *

This class works just like CountDownLatch, with the difference you can also increase the counter

+ *

+ *

It could be used for sync points when one process is feeding the latch while another will wait when everything is done. (e.g. waiting IO completions to finish)

+ *

+ *

On HornetQ we have the requirement of increment and decrement a counter until the user fires a ready handler (commit). At that point we just act as a regular countDown.

+ *

+ *

Note: This latch is reusable. Once it reaches zero, you can call up again, and reuse it on further waits.

+ *

+ *

For example: prepareTransaction will wait for the current completions, and further adds will be called on the latch. Later on when commit is called you can reuse the same latch.

+ * + * @author Clebert Suconic + */ +public class ReusableLatch +{ + /** + * Look at the doc and examples provided by AbstractQueuedSynchronizer for more information + * + * @see AbstractQueuedSynchronizer + */ + @SuppressWarnings("serial") + private static class CountSync extends AbstractQueuedSynchronizer + { + public CountSync(int count) + { + setState(count); + } + + public int getCount() + { + return getState(); + } + + public void setCount(final int count) + { + setState(count); + } + + @Override + public int tryAcquireShared(final int numberOfAqcquires) + { + return getState() == 0 ? 1 : -1; + } + + public void add() + { + for (;;) + { + int actualState = getState(); + int newState = actualState + 1; + if (compareAndSetState(actualState, newState)) + { + return; + } + } + } + + @Override + public boolean tryReleaseShared(final int numberOfReleases) + { + for (;;) + { + int actualState = getState(); + if (actualState == 0) + { + return true; + } + + int newState = actualState - numberOfReleases; + + if (newState < 0) + { + newState = 0; + } + + if (compareAndSetState(actualState, newState)) + { + return newState == 0; + } + } + } + } + + private final CountSync control; + + public ReusableLatch() + { + this(0); + } + + public ReusableLatch(final int count) + { + control = new CountSync(count); + } + + public int getCount() + { + return control.getCount(); + } + + public void setCount(final int count) + { + control.setCount(count); + } + + public void countUp() + { + control.add(); + } + + public void countDown() + { + control.releaseShared(1); + } + + + public void countDown(final int count) + { + control.releaseShared(count); + } + + public void await() throws InterruptedException + { + control.acquireSharedInterruptibly(1); + } + + public boolean await(final long milliseconds) throws InterruptedException + { + return control.tryAcquireSharedNanos(1, TimeUnit.MILLISECONDS.toNanos(milliseconds)); + } + + public boolean await(final long timeWait, TimeUnit timeUnit) throws InterruptedException + { + return control.tryAcquireSharedNanos(1, timeUnit.toNanos(timeWait)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java new file mode 100644 index 0000000..23c158f --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/AbstractJMSTest.java @@ -0,0 +1,135 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Queue; + +import java.lang.ref.WeakReference; + +//import io.hawtjms.jms.JmsConnectionFactory; +//import io.hawtjms.jms.JmsQueue; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.proton.plug.test.minimalserver.DumbServer; +import org.proton.plug.test.minimalserver.MinimalServer; + +/** + * @author Clebert Suconic + */ + +public class AbstractJMSTest +{ + protected final boolean useHawtJMS; + protected final boolean useSASL; + + protected String address = "exampleQueue"; + protected MinimalServer server = new MinimalServer(); + + public AbstractJMSTest(boolean useHawtJMS, boolean useSASL) + { + this.useHawtJMS = useHawtJMS; + this.useSASL = useSASL; + } + + public void tearDown() throws Exception + { + server.stop(); + DumbServer.clear(); + } + + public static void forceGC() + { + System.out.println("#test forceGC"); + WeakReference dumbReference = new WeakReference(new Object()); + // A loop that will wait GC, using the minimalserver time as possible + while (dumbReference.get() != null) + { + System.gc(); + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + } + } + System.out.println("#test forceGC Done"); + } + + + protected Connection createConnection() throws JMSException + { + final ConnectionFactory factory = createConnectionFactory(); + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() + { + @Override + public void onException(JMSException exception) + { + exception.printStackTrace(); + } + }); + connection.start(); + return connection; + } + + + protected ConnectionFactory createConnectionFactory() + { + if (useSASL) + { + if (useHawtJMS) + { +// return new JmsConnectionFactory("aaaaaaaa", "aaaaaaa", "amqp://localhost:" + Constants.PORT); + return null; + } + else + { + return new ConnectionFactoryImpl("localhost", Constants.PORT, "aaaaaaaa", "aaaaaaa"); + } + } + else + { + if (useHawtJMS) + { +// return new JmsConnectionFactory("amqp://localhost:" + Constants.PORT); + return null; + } + else + { + return new ConnectionFactoryImpl("localhost", Constants.PORT, null, null); + } + + } + } + + protected Queue createQueue() + { + if (useHawtJMS) + { +// return new JmsQueue(address); + return null; + } + else + { + return new QueueImpl(address); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java new file mode 100644 index 0000000..27ffea4 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/Constants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test; + +/** + * @author Clebert Suconic + */ + +public class Constants +{ + public static final int PORT = 5672; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java new file mode 100644 index 0000000..cd2638f --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/ProtonTest.java @@ -0,0 +1,372 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.test; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.AMQPClientSenderContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.sasl.ClientSASLPlain; +import org.proton.plug.test.minimalclient.SimpleAMQPConnector; +import org.proton.plug.test.minimalserver.DumbServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.proton.plug.util.ByteUtil; + +/** + * This is simulating a JMS client against a simple server + * @author Andy Taylor + * @author Clebert Suconic + */ +@RunWith(Parameterized.class) +public class ProtonTest extends AbstractJMSTest +{ + + protected Connection connection; + + @Parameterized.Parameters(name = "useHawt={0} sasl={1}") + public static Collection data() + { + List list = Arrays.asList(new Object[][]{ + {Boolean.FALSE, Boolean.TRUE}, + {Boolean.FALSE, Boolean.FALSE}}); + + System.out.println("Size = " + list.size()); + return list; + } + + public ProtonTest(boolean useHawtJMS, boolean useSASL) + { + super(useHawtJMS, useSASL); + } + + + @Before + public void setUp() throws Exception + { + DumbServer.clear(); + AbstractJMSTest.forceGC(); + server.start("127.0.0.1", Constants.PORT, true); + connection = createConnection(); + + } + + @After + public void tearDown() throws Exception + { + if (connection != null) + { + connection.close(); + } + + super.tearDown(); + } + + @Test + public void testMessagesReceivedInParallel() throws Throwable + { + final int numMessages = getNumberOfMessages(); + long time = System.currentTimeMillis(); + final Queue queue = createQueue(); + + final ArrayList exceptions = new ArrayList<>(); + + Thread t = new Thread(new Runnable() + { + @Override + public void run() + { + Connection connectionConsumer = null; + try + { + connectionConsumer = createConnection(); +// connectionConsumer = connection; + connectionConsumer.start(); + Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + int count = numMessages; + while (count > 0) + { + try + { + BytesMessage m = (BytesMessage) consumer.receive(1000); + if (count % 1000 == 0) + { + System.out.println("Count = " + count + ", property=" + m.getStringProperty("XX")); + } + Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m); + count--; + } + catch (JMSException e) + { + break; + } + } + } + catch (Throwable e) + { + exceptions.add(e); + e.printStackTrace(); + } + finally + { + try + { + // if the createconnecion wasn't commented out + if (connectionConsumer != connection) + { + connectionConsumer.close(); + } + } + catch (Throwable ignored) + { + // NO OP + } + } + } + }); + + Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + + t.start(); + + MessageProducer p = session.createProducer(queue); + p.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < numMessages; i++) + { + BytesMessage message = session.createBytesMessage(); + // TODO: this will break stuff if I use a large number + message.writeBytes(new byte[5]); + message.setIntProperty("count", i); + message.setStringProperty("XX", "count" + i); + p.send(message); + } + + long taken = (System.currentTimeMillis() - time); + System.out.println("taken on send = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL); + t.join(); + + for (Throwable e : exceptions) + { + throw e; + } + taken = (System.currentTimeMillis() - time); + System.out.println("taken = " + taken + " usehawt = " + useHawtJMS + " sasl = " + useSASL); + + connection.close(); +// assertEquals(0, q.getMessageCount()); + } + + + @Test + public void testSimpleCreateSessionAndClose() throws Throwable + { + final QueueImpl queue = new QueueImpl(address); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Thread.sleep(1000); + session.close(); + connection.close(); + } + + @Test + public void testSimpleBinary() throws Throwable + { + final int numMessages = 5; + long time = System.currentTimeMillis(); + final QueueImpl queue = new QueueImpl(address); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + byte[] bytes = new byte[0xf + 1]; + for (int i = 0; i <= 0xf; i++) + { + bytes[i] = (byte) i; + } + + + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < numMessages; i++) + { + BytesMessage message = session.createBytesMessage(); + + message.writeBytes(bytes); + message.setIntProperty("count", i); + p.send(message); + } + + + session.close(); + + + Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = sessionConsumer.createConsumer(queue); + + for (int i = 0; i < numMessages; i++) + { + BytesMessage m = (BytesMessage) consumer.receive(5000); + + System.out.println("length " + m.getBodyLength()); + Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); + + m.reset(); + + long size = m.getBodyLength(); + byte[] bytesReceived = new byte[(int) size]; + m.readBytes(bytesReceived); + + + System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1)); + + Assert.assertArrayEquals(bytes, bytesReceived); + } + +// assertEquals(0, q.getMessageCount()); + long taken = (System.currentTimeMillis() - time) / 1000; + System.out.println("taken = " + taken); + } + + @Test + public void testMapMessage() throws Exception + { + Queue queue = createQueue(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(queue); + for (int i = 0; i < 10; i++) + { + MapMessage message = session.createMapMessage(); + message.setInt("x", i); + message.setString("str", "str" + i); + p.send(message); + } + MessageConsumer messageConsumer = session.createConsumer(queue); + for (int i = 0; i < 10; i++) + { + MapMessage m = (MapMessage) messageConsumer.receive(5000); + Assert.assertNotNull(m); + Assert.assertEquals(i, m.getInt("x")); + Assert.assertEquals("str" + i, m.getString("str")); + } + + Assert.assertNull(messageConsumer.receiveNoWait()); + } + + @Test + public void testProperties() throws Exception + { + Queue queue = createQueue(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(queue); + TextMessage message = session.createTextMessage(); + message.setText("msg:0"); + message.setBooleanProperty("true", true); + message.setBooleanProperty("false", false); + message.setStringProperty("foo", "bar"); + message.setDoubleProperty("double", 66.6); + message.setFloatProperty("float", 56.789f); + message.setIntProperty("int", 8); + message.setByteProperty("byte", (byte) 10); + p.send(message); + p.send(message); + connection.start(); + MessageConsumer messageConsumer = session.createConsumer(queue); + TextMessage m = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(m); + Assert.assertEquals("msg:0", m.getText()); + Assert.assertEquals(m.getBooleanProperty("true"), true); + Assert.assertEquals(m.getBooleanProperty("false"), false); + Assert.assertEquals(m.getStringProperty("foo"), "bar"); + Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001); + Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001); + Assert.assertEquals(m.getIntProperty("int"), 8); + Assert.assertEquals(m.getByteProperty("byte"), (byte) 10); + m = (TextMessage) messageConsumer.receive(5000); + Assert.assertNotNull(m); + connection.close(); + } + + // @Test + public void testSendWithSimpleClient() throws Exception + { + SimpleAMQPConnector connector = new SimpleAMQPConnector(); + connector.start(); + AMQPClientConnectionContext clientConnection = connector.connect("127.0.0.1", Constants.PORT); + + clientConnection.clientOpen(new ClientSASLPlain("aa", "aa")); + + AMQPClientSessionContext session = clientConnection.createClientSession(); + AMQPClientSenderContext clientSender = session.createSender(address, true); + + + Properties props = new Properties(); + for (int i = 0; i < 1; i++) + { + MessageImpl message = (MessageImpl) Message.Factory.create(); + + HashMap map = new HashMap(); + + map.put("i", i); + AmqpValue value = new AmqpValue(map); + message.setBody(value); + message.setProperties(props); + clientSender.send(message); + } + + Session clientSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageConsumer consumer = clientSession.createConsumer(createQueue()); + for (int i = 0; i < 1; i++) + { + MapMessage msg = (MapMessage) consumer.receive(5000); + System.out.println("Msg " + msg); + Assert.assertNotNull(msg); + + System.out.println("Receive message " + i); + + Assert.assertEquals(0, msg.getInt("i")); + } + } + + + protected int getNumberOfMessages() + { + return 10000; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java new file mode 100644 index 0000000..90ecee3 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/InVMTestConnector.java @@ -0,0 +1,38 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.invm; + +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.context.client.ProtonClientConnectionContext; +import org.proton.plug.test.minimalclient.Connector; + +/** + * This is used for testing, where we bypass Netty or any networking for test conditions only + * + * @author Clebert Suconic + */ +public class InVMTestConnector implements Connector +{ + @Override + public void start() + { + + } + + @Override + public AMQPClientConnectionContext connect(String host, int port) throws Exception + { + return new ProtonClientConnectionContext(new ProtonINVMSPI()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java new file mode 100644 index 0000000..45c0703 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/invm/ProtonINVMSPI.java @@ -0,0 +1,209 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.invm; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import io.netty.buffer.ByteBuf; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.context.server.ProtonServerConnectionContext; +import org.proton.plug.sasl.AnonymousServerSASL; +import org.proton.plug.sasl.ServerSASLPlain; +import org.proton.plug.test.minimalserver.MinimalSessionSPI; +import org.proton.plug.util.ByteUtil; +import org.proton.plug.util.DebugInfo; + +/** + * @author Clebert Suconic + */ + +public class ProtonINVMSPI implements AMQPConnectionCallback +{ + + AMQPConnectionContext returningConnection; + + ProtonServerConnectionContext serverConnection = new ProtonServerConnectionContext(new ReturnSPI()); + + final ExecutorService mainExecutor = Executors.newSingleThreadExecutor(); + + final ExecutorService returningExecutor = Executors.newSingleThreadExecutor(); + + public ProtonINVMSPI() + { + mainExecutor.execute(new Runnable() + { + public void run() + { + Thread.currentThread().setName("MainExecutor-INVM"); + } + }); + returningExecutor.execute(new Runnable() + { + public void run() + { + Thread.currentThread().setName("ReturningExecutor-INVM"); + } + }); + } + + @Override + public void close() + { + mainExecutor.shutdown(); + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()}; + } + + + @Override + public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) + { + if (DebugInfo.debug) + { + ByteUtil.debugFrame("InVM->", bytes); + } + final int size = bytes.writerIndex(); + + bytes.retain(); + mainExecutor.execute(new Runnable() + { + public void run() + { + try + { + if (DebugInfo.debug) + { + ByteUtil.debugFrame("InVMDone->", bytes); + } + serverConnection.inputBuffer(bytes); + try + { + connection.outputDone(size); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + finally + { + bytes.release(); + } + } + }); + } + + @Override + public void setConnection(AMQPConnectionContext connection) + { + returningConnection = connection; + } + + @Override + public AMQPConnectionContext getConnection() + { + return returningConnection; + } + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return null; + } + + class ReturnSPI implements AMQPConnectionCallback + { + @Override + public void close() + { + + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()}; + } + + + @Override + public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) + { + + final int size = bytes.writerIndex(); + if (DebugInfo.debug) + { + ByteUtil.debugFrame("InVM<-", bytes); + } + + + bytes.retain(); + returningExecutor.execute(new Runnable() + { + public void run() + { + try + { + + if (DebugInfo.debug) + { + ByteUtil.debugFrame("InVM done<-", bytes); + } + + returningConnection.inputBuffer(bytes); + try + { + connection.outputDone(size); + } + catch (Exception e) + { + e.printStackTrace(); + } + + } + finally + { + bytes.release(); + } + } + }); + } + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return new MinimalSessionSPI(); + } + + @Override + public void setConnection(AMQPConnectionContext connection) + { + + } + + @Override + public AMQPConnectionContext getConnection() + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java new file mode 100644 index 0000000..c577fb0 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/AMQPClientSPI.java @@ -0,0 +1,121 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalclient; + +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.sasl.AnonymousServerSASL; +import org.proton.plug.sasl.ServerSASLPlain; +import org.proton.plug.util.ByteUtil; +import org.proton.plug.util.DebugInfo; +import org.proton.plug.util.ReusableLatch; + +/** + * @author Clebert Suconic + */ + +public class AMQPClientSPI implements AMQPConnectionCallback +{ + + final Channel channel; + protected AMQPConnectionContext connection; + + public AMQPClientSPI(Channel channel) + { + this.channel = channel; + } + + public void setConnection(AMQPConnectionContext connection) + { + this.connection = connection; + } + + public AMQPConnectionContext getConnection() + { + return connection; + } + + @Override + public void close() + { + + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()}; + } + + + final ReusableLatch latch = new ReusableLatch(0); + + @Override + public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) + { + if (DebugInfo.debug) + { + ByteUtil.debugFrame("Bytes leaving client", bytes); + } + + final int bufferSize = bytes.writerIndex(); + + + latch.countUp(); + + channel.writeAndFlush(bytes).addListener(new ChannelFutureListener() + { + @Override + public void operationComplete(ChannelFuture future) throws Exception + { + // +// connection.outputDone(bufferSize); + latch.countDown(); + } + }); + + if (connection.isSyncOnFlush()) + { + try + { + if (!latch.await(5, TimeUnit.SECONDS)) + { + // TODO logs + System.err.println("Flush took longer than 5 seconds!!!"); + } + } + catch (Throwable e) + { + e.printStackTrace(); + } + } + + connection.outputDone(bufferSize); + + } + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java new file mode 100644 index 0000000..e653b37 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/Connector.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalclient; + +import org.proton.plug.AMQPClientConnectionContext; + +/** + * @author Clebert Suconic + */ + +public interface Connector +{ + void start(); + + AMQPClientConnectionContext connect(String host, int port) throws Exception; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java new file mode 100644 index 0000000..0d69f3d --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalclient/SimpleAMQPConnector.java @@ -0,0 +1,84 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalclient; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.context.client.ProtonClientConnectionContextFactory; + +/** + * @author Clebert Suconic + */ + +public class SimpleAMQPConnector implements Connector +{ + private Bootstrap bootstrap; + + public void start() + { + + bootstrap = new Bootstrap(); + bootstrap.channel(NioSocketChannel.class); + bootstrap.group(new NioEventLoopGroup(10)); + + bootstrap.handler( + new ChannelInitializer() + { + public void initChannel(Channel channel) throws Exception + { + } + } + ); + } + + public AMQPClientConnectionContext connect(String host, int port) throws Exception + { + SocketAddress remoteDestination = new InetSocketAddress(host, port); + + ChannelFuture future = bootstrap.connect(remoteDestination); + + future.awaitUninterruptibly(); + + AMQPClientSPI clientConnectionSPI = new AMQPClientSPI(future.channel()); + + final AMQPClientConnectionContext connection = (AMQPClientConnectionContext) ProtonClientConnectionContextFactory.getFactory().createConnection(clientConnectionSPI); + + future.channel().pipeline().addLast( + new ChannelDuplexHandler() + { + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception + { + ByteBuf buffer = (ByteBuf) msg; + connection.inputBuffer(buffer); + } + } + ); + + + return connection; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java new file mode 100644 index 0000000..d5c2d4f --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java @@ -0,0 +1,59 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalserver; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; + + +/** + * @author Clebert Suconic + */ + +public class DumbServer +{ + static ConcurrentHashMap> maps = new ConcurrentHashMap<>(); + + public static BlockingDeque getQueue(String name) + { + BlockingDeque q = maps.get(name); + if (q == null) + { + q = new LinkedBlockingDeque(); + BlockingDeque oldValue = maps.putIfAbsent(name, q); + if (oldValue != null) + { + q = oldValue; + } + } + return q; + } + + public static void clear() + { + for (BlockingDeque queue : maps.values()) + { + // We clear the queues just in case there is a component holding it + queue.clear(); + } + maps.clear(); + } + + public static void put(String queue, Object message) + { + getQueue(queue).add(message); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java new file mode 100644 index 0000000..9a6563d --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/test/java/org/proton/plug/test/minimalserver/MinimalConnectionSPI.java @@ -0,0 +1,151 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.test.minimalserver; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.sasl.AnonymousServerSASL; +import org.proton.plug.sasl.ServerSASLPlain; +import org.proton.plug.util.ByteUtil; +import org.proton.plug.util.DebugInfo; +import org.proton.plug.util.ReusableLatch; + +/** + * @author Clebert Suconic + */ + +public class MinimalConnectionSPI implements AMQPConnectionCallback +{ + Channel channel; + + private AMQPConnectionContext connection; + + public MinimalConnectionSPI(Channel channel) + { + this.channel = channel; + } + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + + @Override + public void close() + { + executorService.shutdown(); + } + + public void setConnection(AMQPConnectionContext connection) + { + this.connection = connection; + } + + public AMQPConnectionContext getConnection() + { + return connection; + } + + final ReusableLatch latch = new ReusableLatch(0); + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new ServerSASLPlain()}; + } + + @Override + public void onTransport(final ByteBuf bytes, final AMQPConnectionContext connection) + { + final int bufferSize = bytes.writerIndex(); + + if (DebugInfo.debug) + { + // some debug + byte[] frame = new byte[bytes.writerIndex()]; + int readerOriginalPos = bytes.readerIndex(); + + bytes.getBytes(0, frame); + + try + { + System.err.println("Buffer Outgoing: " + "\n" + ByteUtil.formatGroup(ByteUtil.bytesToHex(frame), 4, 16)); + } + catch (Exception e) + { + e.printStackTrace(); + } + + bytes.readerIndex(readerOriginalPos); + } + + + latch.countUp(); + // ^^ debug + + channel.writeAndFlush(bytes).addListener(new ChannelFutureListener() + { + @Override + public void operationComplete(ChannelFuture future) throws Exception + { + latch.countDown(); + + // https://issues.apache.org/jira/browse/PROTON-645 +// connection.outputDone(bufferSize); +// if (connection.capacity() > 0) +// { +// channel.read(); +// } + } + }); + + channel.flush(); + + if (connection.isSyncOnFlush()) + { + try + { + if (!latch.await(5, TimeUnit.SECONDS)) + { + // TODO logs + System.err.println("Flush took longer than 5 seconds!!!"); + } + } + catch (Throwable e) + { + e.printStackTrace(); + } + } + connection.outputDone(bufferSize); + + +// if (connection.capacity() > 0) +// { +// channel.read(); +// } + } + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return new MinimalSessionSPI(); + } +}