Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 834FA11DA4 for ; Tue, 23 Sep 2014 18:20:26 +0000 (UTC) Received: (qmail 75889 invoked by uid 500); 23 Sep 2014 18:20:26 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 75800 invoked by uid 500); 23 Sep 2014 18:20:26 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 75029 invoked by uid 99); 23 Sep 2014 18:20:26 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Sep 2014 18:20:26 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 90C3B937BA9; Tue, 23 Sep 2014 18:20:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@qpid.apache.org Date: Tue, 23 Sep 2014 18:20:39 -0000 Message-Id: <56cf0248a16a4c398f9032f5cb71daf7@git.apache.org> In-Reply-To: <7d0bb9de60ea4f74b14c83776f4b02de@git.apache.org> References: <7d0bb9de60ea4f74b14c83776f4b02de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/27] Initial drop of donated AMQP Client Code. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java new file mode 100644 index 0000000..c7ba887 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedInputStream.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An optimized buffered input stream for Tcp + */ +public class TcpBufferedInputStream extends FilterInputStream { + + private static final int DEFAULT_BUFFER_SIZE = 8192; + protected byte internalBuffer[]; + protected int count; + protected int position; + + public TcpBufferedInputStream(InputStream in) { + this(in, DEFAULT_BUFFER_SIZE); + } + + public TcpBufferedInputStream(InputStream in, int size) { + super(in); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + internalBuffer = new byte[size]; + } + + protected void fill() throws IOException { + byte[] buffer = internalBuffer; + count = 0; + position = 0; + int n = in.read(buffer, position, buffer.length - position); + if (n > 0) { + count = n + position; + } + } + + @Override + public int read() throws IOException { + if (position >= count) { + fill(); + if (position >= count) { + return -1; + } + } + return internalBuffer[position++] & 0xff; + } + + private int readStream(byte[] b, int off, int len) throws IOException { + int avail = count - position; + if (avail <= 0) { + if (len >= internalBuffer.length) { + return in.read(b, off, len); + } + fill(); + avail = count - position; + if (avail <= 0) { + return -1; + } + } + int cnt = (avail < len) ? avail : len; + System.arraycopy(internalBuffer, position, b, off, cnt); + position += cnt; + return cnt; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + int n = 0; + for (;;) { + int nread = readStream(b, off + n, len - n); + if (nread <= 0) { + return (n == 0) ? nread : n; + } + n += nread; + if (n >= len) { + return n; + } + // if not closed but no bytes available, return + InputStream input = in; + if (input != null && input.available() <= 0) { + return n; + } + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + long avail = count - position; + if (avail <= 0) { + return in.skip(n); + } + long skipped = (avail < n) ? avail : n; + position += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available() + (count - position); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java new file mode 100644 index 0000000..82f8c41 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpBufferedOutputStream.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * An optimized buffered outputstream for Tcp + */ +public class TcpBufferedOutputStream extends FilterOutputStream { + + private static final int BUFFER_SIZE = 8192; + private final byte[] buffer; + private final int bufferlen; + private int count; + + /** + * Constructor + * + * @param out + */ + public TcpBufferedOutputStream(OutputStream out) { + this(out, BUFFER_SIZE); + } + + /** + * Creates a new buffered output stream to write data to the specified underlying output + * stream with the specified buffer size. + * + * @param out + * the underlying output stream. + * @param size + * the buffer size. + * @throws IllegalArgumentException + * if size <= 0. + */ + public TcpBufferedOutputStream(OutputStream out, int size) { + super(out); + if (size <= 0) { + throw new IllegalArgumentException("Buffer size <= 0"); + } + buffer = new byte[size]; + bufferlen = size; + } + + /** + * write a byte on to the stream + * + * @param b + * - byte to write + * @throws IOException + */ + @Override + public void write(int b) throws IOException { + if ((bufferlen - count) < 1) { + flush(); + } + buffer[count++] = (byte) b; + } + + /** + * write a byte array to the stream + * + * @param b + * the byte buffer + * @param off + * the offset into the buffer + * @param len + * the length of data to write + * @throws IOException + */ + @Override + public void write(byte b[], int off, int len) throws IOException { + if (b != null) { + if ((bufferlen - count) < len) { + flush(); + } + if (buffer.length >= len) { + System.arraycopy(b, off, buffer, count, len); + count += len; + } else { + out.write(b, off, len); + } + } + } + + /** + * flush the data to the output stream This doesn't call flush on the underlying + * outputstream, because Tcp is particularly efficent at doing this itself .... + * + * @throws IOException + */ + @Override + public void flush() throws IOException { + if (count > 0 && out != null) { + out.write(buffer, 0, count); + count = 0; + } + } + + /** + * close this stream + * + * @throws IOException + */ + @Override + public void close() throws IOException { + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java new file mode 100644 index 0000000..4a58c8e --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransport.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.qpid.jms.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.vertx.java.core.AsyncResult; +import org.vertx.java.core.AsyncResultHandler; +import org.vertx.java.core.Handler; +import org.vertx.java.core.Vertx; +import org.vertx.java.core.VertxFactory; +import org.vertx.java.core.buffer.Buffer; +import org.vertx.java.core.net.NetClient; +import org.vertx.java.core.net.NetSocket; + +/** + * Vertex based TCP transport for raw data packets. + */ +public class TcpTransport implements Transport { + + private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); + + private final Vertx vertx = VertxFactory.newVertx(); + private final NetClient client = vertx.createNetClient(); + private final URI remoteLocation; + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicReference connectionError = new AtomicReference(); + + private NetSocket socket; + + private TransportListener listener; + private int socketBufferSize = 64 * 1024; + private int soTimeout = -1; + private int connectTimeout = -1; + private int soLinger = Integer.MIN_VALUE; + private boolean keepAlive; + private boolean tcpNoDelay = true; + + /** + * Create a new instance of the transport. + * + * @param listener + * The TransportListener that will receive data from this Transport instance. + * @param remoteLocation + * The remote location where this transport should connection to. + */ + public TcpTransport(TransportListener listener, URI remoteLocation) { + this.listener = listener; + this.remoteLocation = remoteLocation; + } + + @Override + public void connect() throws IOException { + final CountDownLatch connectLatch = new CountDownLatch(1); + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + configureNetClient(client); + + try { + client.connect(remoteLocation.getPort(), remoteLocation.getHost(), new AsyncResultHandler() { + @Override + public void handle(AsyncResult asyncResult) { + if (asyncResult.succeeded()) { + socket = asyncResult.result(); + LOG.info("We have connected! Socket is {}", socket); + + connected.set(true); + connectLatch.countDown(); + + socket.dataHandler(new Handler() { + @Override + public void handle(Buffer event) { + listener.onData(event); + } + }); + + socket.closeHandler(new Handler() { + @Override + public void handle(Void event) { + connected.set(false); + listener.onTransportClosed(); + } + }); + + socket.exceptionHandler(new Handler() { + @Override + public void handle(Throwable event) { + connected.set(false); + listener.onTransportError(event); + } + }); + + } else { + connected.set(false); + connectionError.set(asyncResult.cause()); + connectLatch.countDown(); + } + } + }); + } catch (Throwable reason) { + LOG.info("Failed to connect to target Broker: {}", reason); + throw IOExceptionSupport.create(reason); + } + + try { + connectLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (connectionError.get() != null) { + throw IOExceptionSupport.create(connectionError.get()); + } + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (connected.get()) { + socket.close(); + connected.set(false); + } + + vertx.stop(); + } + } + + @Override + public void send(ByteBuffer output) throws IOException { + checkConnected(); + int length = output.remaining(); + if (length == 0) { + return; + } + + byte[] copy = new byte[length]; + output.get(copy); + Buffer sendBuffer = new Buffer(copy); + + vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); + } + + @Override + public void send(org.fusesource.hawtbuf.Buffer output) throws IOException { + checkConnected(); + int length = output.length(); + if (length == 0) { + return; + } + + org.fusesource.hawtbuf.Buffer clone = output.deepCopy(); + Buffer sendBuffer = new Buffer(clone.data); + vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); + } + + /** + * Allows a subclass to configure the NetClient beyond what this transport might do. + * + * @throws IOException if an error occurs. + */ + protected void configureNetClient(NetClient client) throws IOException { + client.setSendBufferSize(getSocketBufferSize()); + client.setReceiveBufferSize(getSocketBufferSize()); + client.setSoLinger(soLinger); + client.setTCPKeepAlive(keepAlive); + client.setTCPNoDelay(tcpNoDelay); + if (connectTimeout >= 0) { + client.setConnectTimeout(connectTimeout); + } + } + + @Override + public boolean isConnected() { + return this.connected.get(); + } + + private void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + @Override + public TransportListener getTransportListener() { + return this.listener; + } + + @Override + public void setTransportListener(TransportListener listener) { + if (listener == null) { + throw new IllegalArgumentException("Listener cannot be set to null"); + } + + this.listener = listener; + } + + public int getSocketBufferSize() { + return socketBufferSize; + } + + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + public int getSoTimeout() { + return soTimeout; + } + + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + public void setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + } + + public int getSoLinger() { + return soLinger; + } + + public void setSoLinger(int soLinger) { + this.soLinger = soLinger; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public void setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java new file mode 100644 index 0000000..4cced80 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.fusesource.hawtbuf.Buffer; + +/** + * Base class for all QpidJMS Transport instances. + */ +public interface Transport { + + /** + * Performs the protocol connect operation for the implemented Transport type + * such as a TCP socket connection etc. + * + * @throws IOException if an error occurs while attempting the connect. + */ + void connect() throws IOException; + + /** + * @return true if transport is connected or false if the connection is down. + */ + boolean isConnected(); + + /** + * Close the Transport, no additional send operations are accepted. + * + * @throws IOException if an error occurs while closing the connection. + */ + void close() throws IOException; + + /** + * Sends a chunk of data over the Transport connection. + * + * @param output + * The buffer of data that is to be transmitted. + * + * @throws IOException if an error occurs during the send operation. + */ + void send(ByteBuffer output) throws IOException; + + /** + * Sends a chunk of data over the Transport connection. + * + * @param output + * The buffer of data that is to be transmitted. + * + * @throws IOException if an error occurs during the send operation. + */ + void send(Buffer output) throws IOException; + + /** + * Gets the currently set TransportListener instance + * + * @returns the current TransportListener or null if none set. + */ + TransportListener getTransportListener(); + + /** + * Sets the Transport Listener instance that will be notified of incoming data or + * error events. + * + * @param listener + * The new TransportListener instance to use (cannot be null). + * + * @throws IllegalArgumentException if the given listener is null. + */ + void setTransportListener(TransportListener listener); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java new file mode 100644 index 0000000..f244347 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportListener.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports; + +import org.vertx.java.core.buffer.Buffer; + +/** + * Listener interface that should be implemented by users of the various + * QpidJMS Transport classes. + */ +public interface TransportListener { + + /** + * Called when new incoming data has become available. + * + * @param incoming + * the next incoming packet of data. + */ + void onData(Buffer incoming); + + /** + * Called if the connection state becomes closed. + */ + void onTransportClosed(); + + /** + * Called when an error occurs during normal Transport operations. + * + * @param cause + * the error that triggered this event. + */ + void onTransportError(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java new file mode 100644 index 0000000..f20c21f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Abstract Message Queue class used to implement the common functions of a Message Queue + * instance. + */ +public abstract class AbstractMessageQueue implements MessageQueue { + + protected boolean closed; + protected boolean running; + protected Object lock = new Object(); + + @Override + public JmsInboundMessageDispatch peek() { + synchronized (lock) { + return peekFirst(); + } + } + + @Override + public JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException { + synchronized (lock) { + // Wait until the consumer is ready to deliver messages. + while (timeout != 0 && !closed && (isEmpty() || !running)) { + if (timeout == -1) { + lock.wait(); + } else { + lock.wait(timeout); + break; + } + } + + if (closed || !running || isEmpty()) { + return null; + } + + return removeFirst(); + } + } + + @Override + public JmsInboundMessageDispatch dequeueNoWait() { + synchronized (lock) { + if (closed || !running || isEmpty()) { + return null; + } + return removeFirst(); + } + } + + @Override + public void start() { + synchronized (lock) { + running = true; + lock.notifyAll(); + } + } + + @Override + public void stop() { + synchronized (lock) { + running = false; + lock.notifyAll(); + } + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public void close() { + synchronized (lock) { + if (!closed) { + running = false; + closed = true; + } + lock.notifyAll(); + } + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public Object getLock() { + return lock; + } + + /** + * Removes and returns the first entry in the implementation queue. This method + * is always called under lock and does not need to protect itself or check running + * state etc. + * + * @return the first message queued in the implemented queue. + */ + protected abstract JmsInboundMessageDispatch removeFirst(); + + /** + * Returns but does not remove the first entry in the implementation queue. This method + * is always called under lock and does not need to protect itself or check running + * state etc. + * + * @return the first message queued in the implemented queue. + */ + protected abstract JmsInboundMessageDispatch peekFirst(); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java new file mode 100644 index 0000000..8632ee2 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/ClassLoadingAwareObjectInputStream.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.lang.reflect.Proxy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClassLoadingAwareObjectInputStream extends ObjectInputStream { + + private static final Logger LOG = LoggerFactory.getLogger(ClassLoadingAwareObjectInputStream.class); + private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader(); + + private final ClassLoader inLoader; + + public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException { + super(in); + inLoader = in.getClass().getClassLoader(); + } + + @Override + protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + return load(classDesc.getName(), cl, inLoader); + } + + @Override + protected Class resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + Class[] cinterfaces = new Class[interfaces.length]; + for (int i = 0; i < interfaces.length; i++) { + cinterfaces[i] = load(interfaces[i], cl); + } + + try { + return Proxy.getProxyClass(cl, cinterfaces); + } catch (IllegalArgumentException e) { + try { + return Proxy.getProxyClass(inLoader, cinterfaces); + } catch (IllegalArgumentException e1) { + // ignore + } + try { + return Proxy.getProxyClass(FALLBACK_CLASS_LOADER, cinterfaces); + } catch (IllegalArgumentException e2) { + // ignore + } + + throw new ClassNotFoundException(null, e); + } + } + + private Class load(String className, ClassLoader... cl) throws ClassNotFoundException { + // check for simple types first + final Class clazz = loadSimpleType(className); + if (clazz != null) { + LOG.trace("Loaded class: {} as simple type -> ", className, clazz); + return clazz; + } + + // try the different class loaders + for (ClassLoader loader : cl) { + LOG.trace("Attempting to load class: {} using classloader: {}", className, cl); + try { + Class answer = Class.forName(className, false, loader); + if (LOG.isTraceEnabled()) { + LOG.trace("Loaded class: {} using classloader: {} -> ", new Object[] { className, cl, answer }); + } + return answer; + } catch (ClassNotFoundException e) { + LOG.trace("Class not found: {} using classloader: {}", className, cl); + // ignore + } + } + + // and then the fallback class loader + return Class.forName(className, false, FALLBACK_CLASS_LOADER); + } + + /** + * Load a simple type + * + * @param name + * the name of the class to load + * @return the class or null if it could not be loaded + */ + public static Class loadSimpleType(String name) { + if ("java.lang.byte[]".equals(name) || "byte[]".equals(name)) { + return byte[].class; + } else if ("java.lang.Byte[]".equals(name) || "Byte[]".equals(name)) { + return Byte[].class; + } else if ("java.lang.Object[]".equals(name) || "Object[]".equals(name)) { + return Object[].class; + } else if ("java.lang.String[]".equals(name) || "String[]".equals(name)) { + return String[].class; + // and these is common as well + } else if ("java.lang.String".equals(name) || "String".equals(name)) { + return String.class; + } else if ("java.lang.Boolean".equals(name) || "Boolean".equals(name)) { + return Boolean.class; + } else if ("boolean".equals(name)) { + return boolean.class; + } else if ("java.lang.Integer".equals(name) || "Integer".equals(name)) { + return Integer.class; + } else if ("int".equals(name)) { + return int.class; + } else if ("java.lang.Long".equals(name) || "Long".equals(name)) { + return Long.class; + } else if ("long".equals(name)) { + return long.class; + } else if ("java.lang.Short".equals(name) || "Short".equals(name)) { + return Short.class; + } else if ("short".equals(name)) { + return short.class; + } else if ("java.lang.Byte".equals(name) || "Byte".equals(name)) { + return Byte.class; + } else if ("byte".equals(name)) { + return byte.class; + } else if ("java.lang.Float".equals(name) || "Float".equals(name)) { + return Float.class; + } else if ("float".equals(name)) { + return float.class; + } else if ("java.lang.Double".equals(name) || "Double".equals(name)) { + return Double.class; + } else if ("double".equals(name)) { + return double.class; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java new file mode 100644 index 0000000..b8db705 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FactoryFinder.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A Factory finding helper class used to locate objects that serve as Factories for + * other Object types. The search an instantiate mechanism is configurable so that + * in a non-stand-alone environment such as OSGI the finder and be configured to work. + */ +public class FactoryFinder { + + /** + * The strategy that the FactoryFinder uses to find load and instantiate Objects can be + * changed out by calling the + * {@link org.apache.qpid.jms.util.FactoryFinder#setObjectFactory(org.apache.qpid.jms.util.FactoryFinder.ObjectFactory)} + * method with a custom implementation of ObjectFactory. + * + * The default ObjectFactory is typically changed out when running in a specialized + * container environment where service discovery needs to be done via the container system. + * For example, in an OSGi scenario. + */ + public interface ObjectFactory { + + /** + * Creates the requested factory instance. + * + * @param path + * the full service path + * + * @return instance of the factory object being searched for. + * + * @throws IllegalAccessException if an error occurs while accessing the search path. + * @throws InstantiationException if the factory object fails on create. + * @throws IOException if the search encounter an IO error. + * @throws ClassNotFoundException if the class that is to be loaded cannot be found. + */ + public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException; + + } + + private static ObjectFactory objectFactory = new StandaloneObjectFactory(); + + private final ConcurrentHashMap cachedFactories = new ConcurrentHashMap(); + private final String path; + private final Class factoryType; + + /** + * Creates a new instance of the FactoryFinder using the given search path. + * + * @param path + * The path to use when searching for the factory definitions. + */ + public FactoryFinder(Class factoryType, String path) { + this.path = path; + this.factoryType = factoryType; + } + + /** + * @return the currently configured ObjectFactory instance used to locate the Factory objects. + */ + public static ObjectFactory getObjectFactory() { + return objectFactory; + } + + /** + * Sets the ObjectFactory instance to use when searching for the Factory class. This allows + * the default instance to be overridden in an environment where the basic version will not + * work. + * + * @param objectFactory + * the new object factory to use when searching for a Factory instance. + */ + public static void setObjectFactory(ObjectFactory objectFactory) { + FactoryFinder.objectFactory = objectFactory; + } + + /** + * Creates a new instance of the given key. The method first checks the cache of previously + * found factory instances for one that matches the key. If no cached version exists then + * the factory will be searched for using the configured ObjectFactory instance. + * + * @param key + * is the key to add to the path to find a text file containing the factory name + * + * @return a newly created instance + * + * @throws IllegalAccessException if an error occurs while accessing the search path. + * @throws InstantiationException if the factory object fails on create. + * @throws IOException if the search encounter an IO error. + * @throws ClassNotFoundException if the class that is to be loaded cannot be found. + * @throws ClassCastException if the found object is not assignable to the request factory type. + */ + public T newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException, ClassCastException { + T factory = cachedFactories.get(key); + if (factory == null) { + + Object found = objectFactory.create(path + key); + if (found != null && factoryType.isInstance(found)) { + factory = factoryType.cast(found); + cachedFactories.put(key, factory); + } else { + throw new ClassCastException("Cannot cast " + found.getClass().getName() + + " to " + factoryType.getName()); + } + } + + return factory; + } + + /** + * Allow registration of a Provider factory without wiring via META-INF classes + * + * @param scheme + * The URI scheme value that names the target Provider instance. + * @param factory + * The factory to register in this finder. + */ + public void registerProviderFactory(String scheme, T factory) { + cachedFactories.put(scheme, factory); + } + + /** + * The default implementation of Object factory which works well in stand-alone applications. + */ + @SuppressWarnings("rawtypes") + protected static class StandaloneObjectFactory implements ObjectFactory { + final ConcurrentHashMap classMap = new ConcurrentHashMap(); + + @Override + public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { + Class clazz = classMap.get(path); + if (clazz == null) { + clazz = loadClass(loadProperties(path)); + classMap.put(path, clazz); + } + return clazz.newInstance(); + } + + static public Class loadClass(Properties properties) throws ClassNotFoundException, IOException { + + String className = properties.getProperty("class"); + if (className == null) { + throw new IOException("Expected property is missing: class"); + } + Class clazz = null; + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + if (loader != null) { + try { + clazz = loader.loadClass(className); + } catch (ClassNotFoundException e) { + // ignore + } + } + if (clazz == null) { + clazz = FactoryFinder.class.getClassLoader().loadClass(className); + } + + return clazz; + } + + static public Properties loadProperties(String uri) throws IOException { + // lets try the thread context class loader first + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = StandaloneObjectFactory.class.getClassLoader(); + } + InputStream in = classLoader.getResourceAsStream(uri); + if (in == null) { + in = FactoryFinder.class.getClassLoader().getResourceAsStream(uri); + if (in == null) { + throw new IOException("Could not find factory class for resource: " + uri); + } + } + + // lets load the file + BufferedInputStream reader = null; + try { + reader = new BufferedInputStream(in); + Properties properties = new Properties(); + properties.load(reader); + return properties; + } finally { + try { + reader.close(); + } catch (Exception e) { + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java new file mode 100644 index 0000000..b50e480 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Simple first in / first out Message Queue. + */ +public final class FifoMessageQueue extends AbstractMessageQueue { + + protected final LinkedList list = new LinkedList(); + + @Override + public void enqueueFirst(JmsInboundMessageDispatch envelope) { + synchronized (lock) { + list.addFirst(envelope); + lock.notify(); + } + } + + @Override + public void enqueue(JmsInboundMessageDispatch envelope) { + synchronized (lock) { + list.addLast(envelope); + lock.notify(); + } + } + + @Override + public boolean isEmpty() { + synchronized (lock) { + return list.isEmpty(); + } + } + + @Override + public int size() { + synchronized (lock) { + return list.size(); + } + } + + @Override + public void clear() { + synchronized (lock) { + list.clear(); + } + } + + @Override + public List removeAll() { + synchronized (lock) { + ArrayList rc = new ArrayList(list.size()); + for (JmsInboundMessageDispatch entry : list) { + rc.add(entry); + } + list.clear(); + return rc; + } + } + + @Override + public String toString() { + synchronized (lock) { + return list.toString(); + } + } + + @Override + protected JmsInboundMessageDispatch removeFirst() { + return list.removeFirst(); + } + + @Override + protected JmsInboundMessageDispatch peekFirst() { + return list.peekFirst(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java new file mode 100644 index 0000000..6caf15c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IOExceptionSupport.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.io.IOException; + +/** + * Used to make throwing IOException instances easier. + */ +public class IOExceptionSupport { + + /** + * Checks the given cause to determine if it's already an IOException type and + * if not creates a new IOException to wrap it. + * + * @param cause + * The initiating exception that should be cast or wrapped. + * + * @return an IOException instance. + */ + public static IOException create(Throwable cause) { + if (cause instanceof IOException) { + return (IOException) cause; + } + + String message = cause.getMessage(); + if (message == null || message.length() == 0) { + message = cause.toString(); + } + + return new IOException(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java new file mode 100644 index 0000000..f07b5b4 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/IdGenerator.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generator for Globally unique Strings. + */ +public class IdGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(IdGenerator.class); + private static final String UNIQUE_STUB; + private static int instanceCount; + private static String hostName; + private String seed; + private final AtomicLong sequence = new AtomicLong(1); + private int length; + public static final String PROPERTY_IDGENERATOR_PORT = "activemq.idgenerator.port"; + + static { + String stub = ""; + boolean canAccessSystemProps = true; + try { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPropertiesAccess(); + } + } catch (SecurityException se) { + canAccessSystemProps = false; + } + + if (canAccessSystemProps) { + int idGeneratorPort = 0; + ServerSocket ss = null; + try { + idGeneratorPort = Integer.parseInt(System.getProperty(PROPERTY_IDGENERATOR_PORT, "0")); + LOG.trace("Using port {}", idGeneratorPort); + hostName = InetAddressUtil.getLocalHostName(); + ss = new ServerSocket(idGeneratorPort); + stub = "-" + ss.getLocalPort() + "-" + System.currentTimeMillis() + "-"; + Thread.sleep(100); + } catch (Exception e) { + if (LOG.isTraceEnabled()) { + LOG.trace("could not generate unique stub by using DNS and binding to local port", e); + } else { + LOG.warn("could not generate unique stub by using DNS and binding to local port: {} {}", e.getClass().getCanonicalName(), e.getMessage()); + } + + // Restore interrupted state so higher level code can deal with it. + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } finally { + if (ss != null) { + try { + ss.close(); + } catch (IOException ioe) { + if (LOG.isTraceEnabled()) { + LOG.trace("Closing the server socket failed", ioe); + } else { + LOG.warn("Closing the server socket failed" + " due " + ioe.getMessage()); + } + } + } + } + } + + if (hostName == null) { + hostName = "localhost"; + } + hostName = sanitizeHostName(hostName); + + if (stub.length() == 0) { + stub = "-1-" + System.currentTimeMillis() + "-"; + } + UNIQUE_STUB = stub; + } + + /** + * Construct an IdGenerator + */ + public IdGenerator(String prefix) { + synchronized (UNIQUE_STUB) { + this.seed = prefix + UNIQUE_STUB + (instanceCount++) + ":"; + this.length = this.seed.length() + ("" + Long.MAX_VALUE).length(); + } + } + + public IdGenerator() { + this("ID:" + hostName); + } + + /** + * As we have to find the host name as a side-affect of generating a unique stub, we allow + * it's easy retrieval here + * + * @return the local host name + */ + public static String getHostName() { + return hostName; + } + + /** + * Generate a unique id + * + * @return a unique id + */ + public synchronized String generateId() { + StringBuilder sb = new StringBuilder(length); + sb.append(seed); + sb.append(sequence.getAndIncrement()); + return sb.toString(); + } + + public static String sanitizeHostName(String hostName) { + boolean changed = false; + + StringBuilder sb = new StringBuilder(); + for (char ch : hostName.toCharArray()) { + // only include ASCII chars + if (ch < 127) { + sb.append(ch); + } else { + changed = true; + } + } + + if (changed) { + String newHost = sb.toString(); + LOG.info("Sanitized hostname from: {} to: {}", hostName, newHost); + return newHost; + } else { + return hostName; + } + } + + /** + * Generate a unique ID - that is friendly for a URL or file system + * + * @return a unique id + */ + public String generateSanitizedId() { + String result = generateId(); + result = result.replace(':', '-'); + result = result.replace('_', '-'); + result = result.replace('.', '-'); + return result; + } + + /** + * From a generated id - return the seed (i.e. minus the count) + * + * @param id + * the generated identifier + * @return the seed + */ + public static String getSeedFromId(String id) { + String result = id; + if (id != null) { + int index = id.lastIndexOf(':'); + if (index > 0 && (index + 1) < id.length()) { + result = id.substring(0, index); + } + } + return result; + } + + /** + * From a generated id - return the generator count + * + * @param id + * @return the count + */ + public static long getSequenceFromId(String id) { + long result = -1; + if (id != null) { + int index = id.lastIndexOf(':'); + + if (index > 0 && (index + 1) < id.length()) { + String numStr = id.substring(index + 1, id.length()); + result = Long.parseLong(numStr); + } + } + return result; + } + + /** + * Does a proper compare on the Id's + * + * @param id1 + * @param id2 + * @return 0 if equal else a positive if id1 is > id2 ... + */ + public static int compare(String id1, String id2) { + int result = -1; + String seed1 = IdGenerator.getSeedFromId(id1); + String seed2 = IdGenerator.getSeedFromId(id2); + if (seed1 != null && seed2 != null) { + result = seed1.compareTo(seed2); + if (result == 0) { + long count1 = IdGenerator.getSequenceFromId(id1); + long count2 = IdGenerator.getSequenceFromId(id2); + result = (int) (count1 - count2); + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java new file mode 100644 index 0000000..9513892 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/InetAddressUtil.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class InetAddressUtil { + + /** + * When using the {@link java.net.InetAddress#getHostName()} method in an + * environment where neither a proper DNS lookup nor an /etc/hosts + * entry exists for a given host, the following exception will be thrown: + * + * java.net.UnknownHostException: <hostname>: <hostname> + * at java.net.InetAddress.getLocalHost(InetAddress.java:1425) + * ... + * + * Instead of just throwing an UnknownHostException and giving up, this + * method grabs a suitable hostname from the exception and prevents the + * exception from being thrown. If a suitable hostname cannot be acquired + * from the exception, only then is the UnknownHostException thrown. + * + * @return The hostname + * @throws UnknownHostException + * @see {@link java.net.InetAddress#getLocalHost()} + * @see {@link java.net.InetAddress#getHostName()} + */ + public static String getLocalHostName() throws UnknownHostException { + try { + return (InetAddress.getLocalHost()).getHostName(); + } catch (UnknownHostException uhe) { + String host = uhe.getMessage(); // host = "hostname: hostname" + if (host != null) { + int colon = host.indexOf(':'); + if (colon > 0) { + return host.substring(0, colon); + } + } + throw uhe; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java new file mode 100644 index 0000000..a44faf6 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.util.List; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Queue based storage interface for inbound Messages. + */ +public interface MessageQueue { + + /** + * Adds the given message envelope to the end of the Message queue. + * + * @param envelope + * The in-bound Message envelope to enqueue. + */ + void enqueue(JmsInboundMessageDispatch envelope); + + /** + * Adds the given message envelope to the front of the Message queue. + * + * @param envelope + * The in-bound Message envelope to enqueue. + */ + void enqueueFirst(JmsInboundMessageDispatch envelope); + + /** + * @return true if there are no messages in the queue. + */ + boolean isEmpty(); + + /** + * Return but do not remove the first element in the Message queue. + * + * @return the first element in the Queue or null if empty. + */ + JmsInboundMessageDispatch peek(); + + /** + * Used to get an enqueued message. The amount of time this method blocks is + * based on the timeout value. - if timeout==-1 then it blocks until a + * message is received. - if timeout==0 then it it tries to not block at + * all, it returns a message if it is available - if timeout>0 then it + * blocks up to timeout amount of time. Expired messages will consumed by + * this method. + * + * @return null if we timeout or if the consumer is closed. + * + * @throws InterruptedException if the wait is interrupted. + */ + JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException; + + /** + * Used to get an enqueued Message if on exists, otherwise returns null. + * + * @return the next Message in the Queue if one exists, otherwise null. + */ + JmsInboundMessageDispatch dequeueNoWait(); + + /** + * Starts the Message Queue. An non-started Queue will always return null for + * any of the Queue accessor methods. + */ + void start(); + + /** + * Stops the Message Queue. Messages cannot be read from the Queue when it is in + * the stopped state. + */ + void stop(); + + /** + * @return true if the Queue is not in the stopped or closed state. + */ + boolean isRunning(); + + /** + * Closes the Message Queue. No messages can be added or removed from the Queue + * once it has entered the closed state. + */ + void close(); + + /** + * @return true if the Queue has been closed. + */ + boolean isClosed(); + + /** + * Returns the number of Messages currently in the Queue. This value is only + * meaningful at the time of the call as the size of the Queue changes rapidly + * as Messages arrive and are consumed. + * + * @return the current number of Messages in the Queue. + */ + int size(); + + /** + * Clears the Queue of any Messages. + */ + void clear(); + + /** + * Removes and returns all Messages in the Queue. + * + * @return a List containing all Messages removed from the Queue. + */ + List removeAll(); + + /** + * @return the lock object used to protect against concurrent access. + */ + Object getLock(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java new file mode 100644 index 0000000..e0c7357 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.JMSException; + +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +/** + * Simple Message Priority ordered Queue. Message envelopes are stored in the + * Queue based on their priority value. + */ +public final class PriorityMessageQueue extends AbstractMessageQueue { + + private static final Integer MAX_PRIORITY = 10; + + private final LinkedList[] lists; + private int size = 0; + + @SuppressWarnings("unchecked") + public PriorityMessageQueue() { + this.lists = new LinkedList[MAX_PRIORITY]; + for (int i = 0; i < MAX_PRIORITY; i++) { + lists[i] = new LinkedList(); + } + } + + @Override + public void enqueue(JmsInboundMessageDispatch envelope) { + synchronized (lock) { + getList(envelope).addLast(envelope); + this.size++; + lock.notify(); + } + } + + @Override + public void enqueueFirst(JmsInboundMessageDispatch envelope) { + synchronized (lock) { + getList(envelope).addFirst(envelope); + this.size++; + lock.notify(); + } + } + + @Override + public boolean isEmpty() { + synchronized (lock) { + return size == 0; + } + } + + @Override + public int size() { + synchronized (lock) { + return size; + } + } + + @Override + public void clear() { + synchronized (lock) { + for (int i = 0; i < MAX_PRIORITY; i++) { + lists[i].clear(); + } + this.size = 0; + } + } + + @Override + public List removeAll() { + synchronized (lock) { + ArrayList result = new ArrayList(size()); + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + List list = lists[i]; + result.addAll(list); + size -= list.size(); + list.clear(); + } + return result; + } + } + + @Override + protected JmsInboundMessageDispatch removeFirst() { + if (this.size > 0) { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + LinkedList list = lists[i]; + if (!list.isEmpty()) { + this.size--; + return list.removeFirst(); + } + } + } + return null; + } + + @Override + protected JmsInboundMessageDispatch peekFirst() { + if (this.size > 0) { + for (int i = MAX_PRIORITY - 1; i >= 0; i--) { + LinkedList list = lists[i]; + if (!list.isEmpty()) { + return list.peekFirst(); + } + } + } + return null; + } + + private int getPriority(JmsInboundMessageDispatch envelope) { + int priority = javax.jms.Message.DEFAULT_PRIORITY; + if (envelope.getMessage() != null) { + try { + priority = Math.max(envelope.getMessage().getJMSPriority(), 0); + } catch (JMSException e) { + } + priority = Math.min(priority, 9); + } + return priority; + } + + private LinkedList getList(JmsInboundMessageDispatch envelope) { + return lists[getPriority(envelope)]; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java new file mode 100644 index 0000000..8eb61d2 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java @@ -0,0 +1,486 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.util; + +import java.beans.BeanInfo; +import java.beans.Introspector; +import java.beans.PropertyDescriptor; +import java.beans.PropertyEditor; +import java.beans.PropertyEditorManager; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import javax.net.ssl.SSLContext; + +/** + * Utilities for properties + */ +public class PropertyUtil { + + /** + * Creates a URI from the original URI and the given parameters. + * + * @param originalURI + * The URI whose current parameters are remove and replaced with the given remainder + * value. + * @param params + * The URI params that should be used to replace the current ones in the target. + * + * @return a new URI that matches the original one but has its query options replaced with + * the given ones. + * + * @throws URISyntaxException + */ + public static URI replaceQuery(URI originalURI, Map params) throws URISyntaxException { + String s = createQueryString(params); + if (s.length() == 0) { + s = null; + } + return replaceQuery(originalURI, s); + } + + /** + * Creates a URI with the given query, removing an previous query value from the given URI. + * + * @param uri + * The source URI whose existing query is replaced with the newly supplied one. + * @param query + * The new URI query string that should be appended to the given URI. + * + * @return a new URI that is a combination of the original URI and the given query string. + * @throws URISyntaxException + */ + public static URI replaceQuery(URI uri, String query) throws URISyntaxException { + String schemeSpecificPart = uri.getRawSchemeSpecificPart(); + // strip existing query if any + int questionMark = schemeSpecificPart.lastIndexOf("?"); + // make sure question mark is not within parentheses + if (questionMark < schemeSpecificPart.lastIndexOf(")")) { + questionMark = -1; + } + if (questionMark > 0) { + schemeSpecificPart = schemeSpecificPart.substring(0, questionMark); + } + if (query != null && query.length() > 0) { + schemeSpecificPart += "?" + query; + } + return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment()); + } + + /** + * Creates a URI with the given query, removing an previous query value from the given URI. + * + * @param uri + * The source URI whose existing query is replaced with the newly supplied one. + * @param query + * The new URI query string that should be appended to the given URI. + * + * @return a new URI that is a combination of the original URI and the given query string. + * @throws URISyntaxException + */ + public static URI eraseQuery(URI uri) throws URISyntaxException { + return replaceQuery(uri, (String) null); + } + + /** + * Given a key / value mapping, create and return a URI formatted query string that is valid + * and can be appended to a URI. + * + * @param options + * The Mapping that will create the new Query string. + * + * @return a URI formatted query string. + * + * @throws URISyntaxException + */ + public static String createQueryString(Map options) throws URISyntaxException { + try { + if (options.size() > 0) { + StringBuffer rc = new StringBuffer(); + boolean first = true; + for (String key : options.keySet()) { + if (first) { + first = false; + } else { + rc.append("&"); + } + String value = (String) options.get(key); + rc.append(URLEncoder.encode(key, "UTF-8")); + rc.append("="); + rc.append(URLEncoder.encode(value, "UTF-8")); + } + return rc.toString(); + } else { + return ""; + } + } catch (UnsupportedEncodingException e) { + throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e); + } + } + + /** + * Get properties from a URI + * + * @param uri + * @return Map of properties + * @throws Exception + */ + @SuppressWarnings("unchecked") + public static Map parseParameters(URI uri) throws Exception { + return uri.getQuery() == null ? Collections.EMPTY_MAP : parseQuery(stripPrefix(uri.getQuery(), "?")); + } + + /** + * Parse properties from a named resource -eg. a URI or a simple name e.g. + * foo?name="fred"&size=2 + * + * @param uri + * @return Map of properties + * @throws Exception + */ + @SuppressWarnings("unchecked") + public static Map parseParameters(String uri) throws Exception { + return uri == null ? Collections.EMPTY_MAP : parseQuery(stripUpto(uri, '?')); + } + + /** + * Get properties from a uri + * + * @param uri + * @return Map of properties + * + * @throws Exception + */ + public static Map parseQuery(String uri) throws Exception { + if (uri != null) { + Map rc = new HashMap(); + if (uri != null) { + String[] parameters = uri.split("&"); + for (int i = 0; i < parameters.length; i++) { + int p = parameters[i].indexOf("="); + if (p >= 0) { + String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8"); + String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8"); + rc.put(name, value); + } else { + rc.put(parameters[i], null); + } + } + } + return rc; + } + return Collections.emptyMap(); + } + + /** + * Given a map of properties, filter out only those prefixed with the given value, the + * values filtered are returned in a new Map instance. + * + * @param properties + * The map of properties to filter. + * @param optionPrefix + * The prefix value to use when filtering. + * + * @return a filter map with only values that match the given prefix. + */ + public static Map filterProperties(Map props, String optionPrefix) { + if (props == null) { + throw new IllegalArgumentException("props was null."); + } + + HashMap rc = new HashMap(props.size()); + + for (Iterator iter = props.keySet().iterator(); iter.hasNext();) { + String name = iter.next(); + if (name.startsWith(optionPrefix)) { + String value = props.get(name); + name = name.substring(optionPrefix.length()); + rc.put(name, value); + iter.remove(); + } + } + + return rc; + } + + /** + * Add bean properties to a URI + * + * @param uri + * @param bean + * @return Map of properties + * @throws Exception + */ + public static String addPropertiesToURIFromBean(String uri, Object bean) throws Exception { + Map props = PropertyUtil.getProperties(bean); + return PropertyUtil.addPropertiesToURI(uri, props); + } + + /** + * Add properties to a URI + * + * @param uri + * @param props + * @return uri with properties on + * @throws Exception + */ + public static String addPropertiesToURI(URI uri, Map props) throws Exception { + return addPropertiesToURI(uri.toString(), props); + } + + /** + * Add properties to a URI + * + * @param uri + * @param props + * @return uri with properties on + * @throws Exception + */ + public static String addPropertiesToURI(String uri, Map props) throws Exception { + String result = uri; + if (uri != null && props != null) { + StringBuilder base = new StringBuilder(stripBefore(uri, '?')); + Map map = parseParameters(uri); + if (!map.isEmpty()) { + map.putAll(props); + } + if (!map.isEmpty()) { + base.append('?'); + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (!first) { + base.append('&'); + } + first = false; + base.append(entry.getKey()).append("=").append(entry.getValue()); + } + result = base.toString(); + } + } + return result; + } + + /** + * Set properties on an object using the provided map. The return value indicates if all + * properties from the given map were set on the target object. + * + * @param target + * the object whose properties are to be set from the map options. + * @param props + * the properties that should be applied to the given object. + * + * @return true if all values in the props map were applied to the target object. + */ + public static boolean setProperties(Object target, Map props) { + if (target == null) { + throw new IllegalArgumentException("target was null."); + } + if (props == null) { + throw new IllegalArgumentException("props was null."); + } + + int setCounter = 0; + + for (Map.Entry entry : props.entrySet()) { + if (setProperty(target, entry.getKey(), entry.getValue())) { + setCounter++; + } + } + + return setCounter == props.size(); + } + + /** + * Get properties from an object + * + * @param object + * @return Map of properties + * @throws Exception + */ + public static Map getProperties(Object object) throws Exception { + Map props = new LinkedHashMap(); + BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass()); + Object[] NULL_ARG = {}; + PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); + if (propertyDescriptors != null) { + for (int i = 0; i < propertyDescriptors.length; i++) { + PropertyDescriptor pd = propertyDescriptors[i]; + if (pd.getReadMethod() != null && !pd.getName().equals("class") && !pd.getName().equals("properties") && !pd.getName().equals("reference")) { + Object value = pd.getReadMethod().invoke(object, NULL_ARG); + if (value != null) { + if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof URI || value instanceof URL) { + props.put(pd.getName(), ("" + value)); + } else if (value instanceof SSLContext) { + // ignore this one.. + } else { + Map inner = getProperties(value); + for (Map.Entry entry : inner.entrySet()) { + props.put(pd.getName() + "." + entry.getKey(), entry.getValue()); + } + } + } + } + } + } + return props; + } + + /** + * Find a specific property getter in a given object based on a property name. + * + * @param object + * the object to search. + * @param name + * the property name to search for. + * + * @return the result of invoking the specific property get method. + * + * @throws Exception if an error occurs while searching the object's bean info. + */ + public static Object getProperty(Object object, String name) throws Exception { + BeanInfo beanInfo = Introspector.getBeanInfo(object.getClass()); + PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors(); + if (propertyDescriptors != null) { + for (int i = 0; i < propertyDescriptors.length; i++) { + PropertyDescriptor pd = propertyDescriptors[i]; + if (pd.getReadMethod() != null && pd.getName().equals(name)) { + return pd.getReadMethod().invoke(object); + } + } + } + return null; + } + + /** + * Set a property + * + * @param target + * @param name + * @param value + * @return true if set + */ + public static boolean setProperty(Object target, String name, Object value) { + try { + int dotPos = name.indexOf("."); + while (dotPos >= 0) { + String getterName = name.substring(0, dotPos); + target = getProperty(target, getterName); + name = name.substring(dotPos + 1); + dotPos = name.indexOf("."); + } + + Class clazz = target.getClass(); + Method setter = findSetterMethod(clazz, name); + if (setter == null) { + return false; + } + // If the type is null or it matches the needed type, just use the + // value directly + if (value == null || value.getClass() == setter.getParameterTypes()[0]) { + setter.invoke(target, new Object[] { value }); + } else { + // We need to convert it + setter.invoke(target, new Object[] { convert(value, setter.getParameterTypes()[0]) }); + } + return true; + } catch (Throwable ignore) { + return false; + } + } + + /** + * Return a String past a prefix + * + * @param value + * @param prefix + * @return stripped + */ + public static String stripPrefix(String value, String prefix) { + if (value.startsWith(prefix)) { + return value.substring(prefix.length()); + } + return value; + } + + /** + * Return a String from to a character + * + * @param value + * @param c + * @return stripped + */ + public static String stripUpto(String value, char c) { + String result = null; + int index = value.indexOf(c); + if (index > 0) { + result = value.substring(index + 1); + } + return result; + } + + /** + * Return a String up to and including character + * + * @param value + * @param c + * @return stripped + */ + public static String stripBefore(String value, char c) { + String result = value; + int index = value.indexOf(c); + if (index > 0) { + result = value.substring(0, index); + } + return result; + } + + private static Method findSetterMethod(Class clazz, String name) { + // Build the method name. + name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1); + Method[] methods = clazz.getMethods(); + for (int i = 0; i < methods.length; i++) { + Method method = methods[i]; + Class params[] = method.getParameterTypes(); + if (method.getName().equals(name) && params.length == 1) { + return method; + } + } + return null; + } + + private static Object convert(Object value, Class type) throws Exception { + PropertyEditor editor = PropertyEditorManager.findEditor(type); + if (editor != null) { + editor.setAsText(value.toString()); + return editor.getValue(); + } + if (type == URI.class) { + return new URI(value.toString()); + } + return null; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org