Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 020D717AA2 for ; Tue, 11 Nov 2014 18:41:40 +0000 (UTC) Received: (qmail 33368 invoked by uid 500); 11 Nov 2014 18:41:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 33303 invoked by uid 500); 11 Nov 2014 18:41:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31623 invoked by uid 99); 11 Nov 2014 18:41:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 18:41:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B4483A0D78D; Tue, 11 Nov 2014 18:41:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 18:42:14 -0000 Message-Id: <4dacf5fe483f41f49b81efe19fa183f5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [44/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfiguration.java new file mode 100644 index 0000000..7805052 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfiguration.java @@ -0,0 +1,414 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq6.core.client.HornetQClientMessageBundle; +import org.apache.activemq6.core.remoting.impl.TransportConfigurationUtil; +import org.apache.activemq6.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq6.utils.UUIDGenerator; + +/** + * A TransportConfiguration is used by a client to specify connections to a server and its backup if + * one exists. + *

+ * Typically the constructors take the class name and parameters for needed to create the + * connection. These will be different dependent on which connector is being used, i.e. Netty or + * InVM etc. For example: + * + *

+ * HashMap<String, Object> map = new HashMap<String, Object>();
+ * map.put("host", "localhost");
+ * map.put("port", 5445);
+ * TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map);
+ * ClientSessionFactory sf = new ClientSessionFactoryImpl(config);
+ * 
+ * @author Tim Fox + */ +public class TransportConfiguration implements Serializable +{ + private static final long serialVersionUID = -3994528421527392679L; + + private String name; + + private String factoryClassName; + + private Map params; + + private static final byte TYPE_BOOLEAN = 0; + + private static final byte TYPE_INT = 1; + + private static final byte TYPE_LONG = 2; + + private static final byte TYPE_STRING = 3; + + /** + * Utility method for splitting a comma separated list of hosts + * + * @param commaSeparatedHosts the comma separated host string + * @return the hosts + */ + public static String[] splitHosts(final String commaSeparatedHosts) + { + if (commaSeparatedHosts == null) + { + return new String[0]; + } + String[] hosts = commaSeparatedHosts.split(","); + + for (int i = 0; i < hosts.length; i++) + { + hosts[i] = hosts[i].trim(); + } + return hosts; + } + + /** + * Creates a default TransportConfiguration with no configured transport. + */ + public TransportConfiguration() + { + this.params = new HashMap<>(); + } + + /** + * Creates a TransportConfiguration with a specific name providing the class name of the {@link org.apache.activemq6.spi.core.remoting.ConnectorFactory} + * and any parameters needed. + * + * @param className The class name of the ConnectorFactory + * @param params The parameters needed by the ConnectorFactory + * @param name The name of this TransportConfiguration + */ + public TransportConfiguration(final String className, final Map params, final String name) + { + factoryClassName = className; + + if (params == null || params.isEmpty()) + { + this.params = TransportConfigurationUtil.getDefaults(className); + } + else + { + this.params = params; + } + + this.name = name; + } + + /** + * Creates a TransportConfiguration providing the class name of the {@link org.apache.activemq6.spi.core.remoting.ConnectorFactory} + * and any parameters needed. + * + * @param className The class name of the ConnectorFactory + * @param params The parameters needed by the ConnectorFactory + */ + public TransportConfiguration(final String className, final Map params) + { + this(className, params, UUIDGenerator.getInstance().generateStringUUID()); + } + + /** + * Creates a TransportConfiguration providing the class name of the {@link org.apache.activemq6.spi.core.remoting.ConnectorFactory} + * + * @param className The class name of the ConnectorFactory + */ + public TransportConfiguration(final String className) + { + this(className, new HashMap(), UUIDGenerator.getInstance().generateStringUUID()); + } + + /** + * Returns the name of this TransportConfiguration. + * + * @return the name + */ + public String getName() + { + return name; + } + + /** + * Returns the class name of ConnectorFactory being used by this TransportConfiguration + * + * @return The factory's class name + */ + public String getFactoryClassName() + { + return factoryClassName; + } + + /** + * Returns any parameters set for this TransportConfiguration + * + * @return the parameters + */ + public Map getParams() + { + return params; + } + + @Override + public int hashCode() + { + return factoryClassName.hashCode(); + } + + @Override + public boolean equals(final Object other) + { + if (other instanceof TransportConfiguration == false) + { + return false; + } + + TransportConfiguration kother = (TransportConfiguration) other; + + if (factoryClassName.equals(kother.factoryClassName)) + { + if (params == null || params.isEmpty()) + { + return kother.params == null || kother.params.isEmpty(); + } + else + { + if (kother.params == null || kother.params.isEmpty()) + { + return false; + } + else if (params.size() == kother.params.size()) + { + for (Map.Entry entry : params.entrySet()) + { + Object thisVal = entry.getValue(); + + Object otherVal = kother.params.get(entry.getKey()); + + if (otherVal == null || !otherVal.equals(thisVal)) + { + return false; + } + } + return true; + } + else + { + return false; + } + } + } + else + { + return false; + } + } + + /** + * There's a case on ClusterConnections that we need to find an equivalent Connector and we can't + * use a Netty Cluster Connection on an InVM ClusterConnection (inVM used on tests) for that + * reason I need to test if the two instances of the TransportConfiguration are equivalent while + * a test a connector against an acceptor + * @param otherConfig + * @return {@code true} if the factory class names are equivalents + */ + public boolean isEquivalent(TransportConfiguration otherConfig) + { + if (this.getFactoryClassName().equals(otherConfig.getFactoryClassName())) + { + return true; + } + else if (this.getFactoryClassName().contains("Netty") && otherConfig.getFactoryClassName().contains("Netty")) + { + return true; + } + else if (this.getFactoryClassName().contains("InVM") && otherConfig.getFactoryClassName().contains("InVM")) + { + return true; + } + else + { + return false; + } + } + + @Override + public String toString() + { + StringBuilder str = new StringBuilder(TransportConfiguration.class.getSimpleName()); + str.append("(name=" + name + ", "); + str.append("factory=" + replaceWildcardChars(factoryClassName)); + str.append(") "); + if (params != null) + { + if (!params.isEmpty()) + { + str.append("?"); + } + + boolean first = true; + for (Map.Entry entry : params.entrySet()) + { + if (!first) + { + str.append("&"); + } + + String key = entry.getKey(); + + // HORNETQ-1281 - don't log passwords + String val; + if (key.equals(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME) || key.equals(TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD)) + { + val = "****"; + } + else + { + val = entry.getValue() == null ? "null" : entry.getValue().toString(); + } + + str.append(replaceWildcardChars(key)).append('=').append(replaceWildcardChars(val)); + + first = false; + } + } + return str.toString(); + } + + /** + * Encodes this TransportConfiguration into a buffer. + *

+ * Note that this is only used internally HornetQ. + * + * @param buffer the buffer to encode into + */ + public void encode(final HornetQBuffer buffer) + { + buffer.writeString(name); + buffer.writeString(factoryClassName); + + buffer.writeInt(params == null ? 0 : params.size()); + + if (params != null) + { + for (Map.Entry entry : params.entrySet()) + { + buffer.writeString(entry.getKey()); + + Object val = entry.getValue(); + + if (val instanceof Boolean) + { + buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN); + buffer.writeBoolean((Boolean) val); + } + else if (val instanceof Integer) + { + buffer.writeByte(TransportConfiguration.TYPE_INT); + buffer.writeInt((Integer) val); + } + else if (val instanceof Long) + { + buffer.writeByte(TransportConfiguration.TYPE_LONG); + buffer.writeLong((Long) val); + } + else if (val instanceof String) + { + buffer.writeByte(TransportConfiguration.TYPE_STRING); + buffer.writeString((String) val); + } + else + { + throw HornetQClientMessageBundle.BUNDLE.invalidEncodeType(val); + } + } + } + } + + /** + * Decodes this TransportConfiguration from a buffer. + *

+ * Note this is only used internally by HornetQ + * + * @param buffer the buffer to decode from + */ + public void decode(final HornetQBuffer buffer) + { + name = buffer.readString(); + factoryClassName = buffer.readString(); + + int num = buffer.readInt(); + + if (params == null) + { + if (num > 0) + { + params = new HashMap(); + } + } + else + { + params.clear(); + } + + for (int i = 0; i < num; i++) + { + String key = buffer.readString(); + + byte type = buffer.readByte(); + + Object val; + + switch (type) + { + case TYPE_BOOLEAN: + { + val = buffer.readBoolean(); + + break; + } + case TYPE_INT: + { + val = buffer.readInt(); + + break; + } + case TYPE_LONG: + { + val = buffer.readLong(); + + break; + } + case TYPE_STRING: + { + val = buffer.readString(); + + break; + } + default: + { + throw HornetQClientMessageBundle.BUNDLE.invalidType(type); + } + } + + params.put(key, val); + } + } + + private static String replaceWildcardChars(final String str) + { + return str.replace('.', '-'); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfigurationHelper.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfigurationHelper.java new file mode 100644 index 0000000..e06f3d0 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/TransportConfigurationHelper.java @@ -0,0 +1,26 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.api.core; + +import java.util.Map; + +/** + * Helper interface for specifying default parameters on Transport Configurations. + * + * @author Martyn Taylor + */ +public interface TransportConfigurationHelper +{ + Map getDefaults(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/UDPBroadcastGroupConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/UDPBroadcastGroupConfiguration.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/UDPBroadcastGroupConfiguration.java new file mode 100644 index 0000000..db1ed47 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/UDPBroadcastGroupConfiguration.java @@ -0,0 +1,335 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq6.core.client.HornetQClientLogger; + + +/** + * The configuration used to determine how the server will broadcast members. + *

+ * This is analogous to {@link org.apache.activemq6.api.core.DiscoveryGroupConfiguration} + * + * @author Tim Fox Created 18 Nov 2008 08:44:30 + */ +public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper +{ + private static final long serialVersionUID = 1052413739064253955L; + + private transient String localBindAddress = null; + + private transient int localBindPort = -1; + + private String groupAddress = null; + + private int groupPort = -1; + + public UDPBroadcastGroupConfiguration() + { + } + + public BroadcastEndpointFactory createBroadcastEndpointFactory() + { + return new BroadcastEndpointFactory() + { + @Override + public BroadcastEndpoint createBroadcastEndpoint() throws Exception + { + return new UDPBroadcastEndpoint() + .setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null) + .setGroupPort(groupPort) + .setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null) + .setLocalBindPort(localBindPort); + } + }; + } + + public String getGroupAddress() + { + return groupAddress; + } + + public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress) + { + this.groupAddress = groupAddress; + return this; + } + + public int getGroupPort() + { + return groupPort; + } + + public UDPBroadcastGroupConfiguration setGroupPort(int groupPort) + { + this.groupPort = groupPort; + return this; + } + + public int getLocalBindPort() + { + return localBindPort; + } + + public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort) + { + this.localBindPort = localBindPort; + return this; + } + + public String getLocalBindAddress() + { + return localBindAddress; + } + + public UDPBroadcastGroupConfiguration setLocalBindAddress(String localBindAddress) + { + this.localBindAddress = localBindAddress; + return this; + } + + /** + *

This is the member discovery implementation using direct UDP. It was extracted as a refactoring from + * {@link org.apache.activemq6.core.cluster.DiscoveryGroup}

+ * + * @author Tomohisa + * @author Howard Gao + * @author Clebert Suconic + */ + private static class UDPBroadcastEndpoint implements BroadcastEndpoint + { + private static final int SOCKET_TIMEOUT = 500; + + private InetAddress localAddress; + + private int localBindPort; + + private InetAddress groupAddress; + + private int groupPort; + + private DatagramSocket broadcastingSocket; + + private MulticastSocket receivingSocket; + + private volatile boolean open; + + public UDPBroadcastEndpoint() + { + } + + public UDPBroadcastEndpoint setGroupAddress(InetAddress groupAddress) + { + this.groupAddress = groupAddress; + return this; + } + + public UDPBroadcastEndpoint setGroupPort(int groupPort) + { + this.groupPort = groupPort; + return this; + } + + public UDPBroadcastEndpoint setLocalBindAddress(InetAddress localAddress) + { + this.localAddress = localAddress; + return this; + } + + public UDPBroadcastEndpoint setLocalBindPort(int localBindPort) + { + this.localBindPort = localBindPort; + return this; + } + + + public void broadcast(byte[] data) throws Exception + { + DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort); + broadcastingSocket.send(packet); + } + + public byte[] receiveBroadcast() throws Exception + { + final byte[] data = new byte[65535]; + final DatagramPacket packet = new DatagramPacket(data, data.length); + + while (open) + { + try + { + receivingSocket.receive(packet); + } + // TODO: Do we need this? + catch (InterruptedIOException e) + { + continue; + } + catch (IOException e) + { + if (open) + { + HornetQClientLogger.LOGGER.warn(this + " getting exception when receiving broadcasting.", e); + } + } + break; + } + return data; + } + + public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception + { + // We just use the regular method on UDP, there's no timeout support + // and this is basically for tests only + return receiveBroadcast(); + } + + public void openBroadcaster() throws Exception + { + if (localBindPort != -1) + { + broadcastingSocket = new DatagramSocket(localBindPort, localAddress); + } + else + { + if (localAddress != null) + { + HornetQClientLogger.LOGGER.broadcastGroupBindError(); + } + broadcastingSocket = new DatagramSocket(); + } + + open = true; + } + + public void openClient() throws Exception + { + // HORNETQ-874 + if (checkForLinux() || checkForSolaris() || checkForHp()) + { + try + { + receivingSocket = new MulticastSocket(new InetSocketAddress(groupAddress, groupPort)); + } + catch (IOException e) + { + HornetQClientLogger.LOGGER.ioDiscoveryError(groupAddress.getHostAddress(), groupAddress instanceof Inet4Address ? "IPv4" : "IPv6"); + + receivingSocket = new MulticastSocket(groupPort); + } + } + else + { + receivingSocket = new MulticastSocket(groupPort); + } + + if (localAddress != null) + { + receivingSocket.setInterface(localAddress); + } + + receivingSocket.joinGroup(groupAddress); + + receivingSocket.setSoTimeout(SOCKET_TIMEOUT); + + open = true; + } + + //@Todo: using isBroadcast to share endpoint between broadcast and receiving + public void close(boolean isBroadcast) throws Exception + { + open = false; + + if (broadcastingSocket != null) + { + broadcastingSocket.close(); + } + + if (receivingSocket != null) + { + receivingSocket.close(); + } + } + + private static boolean checkForLinux() + { + return checkForPresence("os.name", "linux"); + } + + private static boolean checkForHp() + { + return checkForPresence("os.name", "hp"); + } + + private static boolean checkForSolaris() + { + return checkForPresence("os.name", "sun"); + } + + private static boolean checkForPresence(String key, String value) + { + try + { + String tmp = System.getProperty(key); + return tmp != null && tmp.trim().toLowerCase().startsWith(value); + } + catch (Throwable t) + { + return false; + } + } + + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((groupAddress == null) ? 0 : groupAddress.hashCode()); + result = prime * result + groupPort; + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + UDPBroadcastGroupConfiguration other = (UDPBroadcastGroupConfiguration) obj; + if (groupAddress == null) + { + if (other.groupAddress != null) + return false; + } + else if (!groupAddress.equals(other.groupAddress)) + return false; + if (groupPort != other.groupPort) + return false; + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientConsumer.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientConsumer.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientConsumer.java new file mode 100644 index 0000000..1d980d6 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientConsumer.java @@ -0,0 +1,123 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.spi.core.remoting.ConsumerContext; + +/** + * A ClientConsumer receives messages from HornetQ queues. + *
+ * Messages can be consumed synchronously by using the receive() methods + * which will block until a message is received (or a timeout expires) or asynchronously + * by setting a {@link MessageHandler}. + *
+ * These 2 types of consumption are exclusive: a ClientConsumer with a MessageHandler set will + * throw HornetQException if its receive() methods are called. + * + * @author Tim Fox + * @author Clebert Suconic + * @author Andy Taylor + * + * @see ClientSession#createConsumer(String) + */ +public interface ClientConsumer extends AutoCloseable +{ + + /** + * The server's ID associated with this consumer. + * HornetQ implements this as a long but this could be protocol dependent. + * @return + */ + ConsumerContext getConsumerContext(); + + /** + * Receives a message from a queue. + *

+ * This call will block indefinitely until a message is received. + *

+ * Calling this method on a closed consumer will throw a HornetQException. + * @return a ClientMessage + * @throws HornetQException if an exception occurs while waiting to receive a message + */ + ClientMessage receive() throws HornetQException; + + /** + * Receives a message from a queue. + *

+ * This call will block until a message is received or the given timeout expires. + *

+ * Calling this method on a closed consumer will throw a HornetQException. + * @param timeout time (in milliseconds) to wait to receive a message + * @return a message or {@code null} if the time out expired + * @throws HornetQException if an exception occurs while waiting to receive a message + */ + ClientMessage receive(long timeout) throws HornetQException; + + /** + * Receives a message from a queue. This call will force a network trip to HornetQ server to + * ensure that there are no messages in the queue which can be delivered to this consumer. + *

+ * This call will never wait indefinitely for a message, it will return {@code null} if no + * messages are available for this consumer. + *

+ * Note however that there is a performance cost as an additional network trip to the server may + * required to check the queue status. + *

+ * Calling this method on a closed consumer will throw a HornetQException. + * @return a message or {@code null} if there are no messages in the queue for this consumer + * @throws HornetQException if an exception occurs while waiting to receive a message + */ + ClientMessage receiveImmediate() throws HornetQException; + + /** + * Returns the MessageHandler associated to this consumer. + *

+ * Calling this method on a closed consumer will throw a HornetQException. + * @return the MessageHandler associated to this consumer or {@code null} + * @throws HornetQException if an exception occurs while getting the MessageHandler + */ + MessageHandler getMessageHandler() throws HornetQException; + + /** + * Sets the MessageHandler for this consumer to consume messages asynchronously. + *

+ * Calling this method on a closed consumer will throw a HornetQException. + * @param handler a MessageHandler + * @throws HornetQException if an exception occurs while setting the MessageHandler + */ + ClientConsumer setMessageHandler(MessageHandler handler) throws HornetQException; + + /** + * Closes the consumer. + *

+ * Once this consumer is closed, it can not receive messages, whether synchronously or + * asynchronously. + * @throws HornetQException + */ + void close() throws HornetQException; + + /** + * Returns whether the consumer is closed or not. + * + * @return true if this consumer is closed, false else + */ + boolean isClosed(); + + /** + * Returns the last exception thrown by a call to this consumer's MessageHandler. + * + * @return the last exception thrown by a call to this consumer's MessageHandler or {@code null} + */ + Exception getLastException(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientMessage.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientMessage.java new file mode 100644 index 0000000..73a2ad9 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientMessage.java @@ -0,0 +1,232 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; + +/** + * + * A ClientMessage represents a message sent and/or received by HornetQ. + * + * @author Tim Fox + * @author Clebert Suconic + * + */ +public interface ClientMessage extends Message +{ + /** + * Returns the number of times this message was delivered. + */ + int getDeliveryCount(); + + /** + * Sets the delivery count for this message. + *

+ * This method is not meant to be called by HornetQ clients. + * @param deliveryCount message delivery count + * @return this ClientMessage + */ + ClientMessage setDeliveryCount(int deliveryCount); + + /** + * Acknowledges reception of this message. + *

+ * If the session responsible to acknowledge this message has {@code autoCommitAcks} set to + * {@code true}, the transaction will automatically commit the current transaction. Otherwise, + * this acknowledgement will not be committed until the client commits the session transaction. + * @throws HornetQException if an error occurred while acknowledging the message. + * @see ClientSession#isAutoCommitAcks() + */ + ClientMessage acknowledge() throws HornetQException; + + /** + * Acknowledges reception of a single message. + *

+ * If the session responsible to acknowledge this message has {@code autoCommitAcks} set to + * {@code true}, the transaction will automatically commit the current transaction. Otherwise, + * this acknowledgement will not be committed until the client commits the session transaction. + * @throws HornetQException if an error occurred while acknowledging the message. + * @see ClientSession#isAutoCommitAcks() + */ + ClientMessage individualAcknowledge() throws HornetQException; + + /** + * This can be optionally used to verify if the entire message has been received. + * It won't have any effect on regular messages but it may be helpful on large messages. + * The use case for this is to make sure there won't be an exception while getting the buffer. + * Using getBodyBuffer directly would have the same effect but you could get a Runtime non checked Exception + * instead + * @throws HornetQException + */ + void checkCompletion() throws HornetQException; + + /** + * Returns the size (in bytes) of this message's body + */ + int getBodySize(); + + /** + * Sets the OutputStream that will receive the content of a message received in a non blocking way. + *
+ * This method is used when consuming large messages + * + * @throws HornetQException + * @return this ClientMessage + */ + ClientMessage setOutputStream(OutputStream out) throws HornetQException; + + /** + * Saves the content of the message to the OutputStream. + * It will block until the entire content is transferred to the OutputStream. + *
+ * + * @throws HornetQException + */ + void saveToOutputStream(OutputStream out) throws HornetQException; + + /** + * Wait the outputStream completion of the message. + * + * This method is used when consuming large messages + * + * @param timeMilliseconds - 0 means wait forever + * @return true if it reached the end + * @throws HornetQException + */ + boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException; + + /** + * Sets the body's IntputStream. + *
+ * This method is used when sending large messages + * @return this ClientMessage + */ + ClientMessage setBodyInputStream(InputStream bodyInputStream); + + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putBooleanProperty(SimpleString key, boolean value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putBooleanProperty(String key, boolean value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putByteProperty(SimpleString key, byte value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putByteProperty(String key, byte value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putBytesProperty(SimpleString key, byte[] value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putBytesProperty(String key, byte[] value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putShortProperty(SimpleString key, short value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putShortProperty(String key, short value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putCharProperty(SimpleString key, char value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putCharProperty(String key, char value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putIntProperty(SimpleString key, int value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putIntProperty(String key, int value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putLongProperty(SimpleString key, long value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putLongProperty(String key, long value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putFloatProperty(SimpleString key, float value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putFloatProperty(String key, float value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putDoubleProperty(SimpleString key, double value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putDoubleProperty(String key, double value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putStringProperty(SimpleString key, SimpleString value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage putStringProperty(String key, String value); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage writeBodyBufferBytes(byte[] bytes); + + /** + * Overridden from {@link org.apache.activemq6.api.core.Message} to enable fluent API + */ + ClientMessage writeBodyBufferString(String string); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientProducer.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientProducer.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientProducer.java new file mode 100644 index 0000000..b9abb24 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientProducer.java @@ -0,0 +1,149 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; + +/** + * A ClientProducer is used to send messages to a specific address. Messages are then routed on the + * server to any queues that are bound to the address. A ClientProducer can either be created with a + * specific address in mind or with none. With the latter the address must be provided using the + * appropriate send() method.
+ *

+ * The sending semantics can change depending on what blocking semantics are set via + * {@link ServerLocator#setBlockOnDurableSend(boolean)} and + * {@link org.apache.activemq6.api.core.client.ServerLocator#setBlockOnNonDurableSend(boolean)} . If set to + * true then for each message type, durable and non durable respectively, any exceptions such as the + * address not existing or security exceptions will be thrown at the time of send. Alternatively if + * set to false then exceptions will only be logged on the server.
+ *

+ * The send rate can also be controlled via {@link ServerLocator#setProducerMaxRate(int)} and the + * {@link org.apache.activemq6.api.core.client.ServerLocator#setProducerWindowSize(int)}.
+ *
+ * @author Tim Fox + * @author Andy Taylor + */ +public interface ClientProducer extends AutoCloseable +{ + /** + * Returns the address where messages will be sent. + * + *

The address can be {@code null} if the ClientProducer + * + * was creating without specifying an address, that is by using {@link ClientSession#createProducer()}. + * + * @return the address where messages will be sent + */ + SimpleString getAddress(); + + /** + * Sends a message to an address. specified in {@link ClientSession#createProducer(String)} or + * similar methods.
+ *
+ * This will block until confirmation that the message has reached the server has been received + * if {@link ServerLocator#setBlockOnDurableSend(boolean)} or + * {@link ServerLocator#setBlockOnNonDurableSend(boolean)} are set to true for the + * specified message type. + * @param message the message to send + * @throws HornetQException if an exception occurs while sending the message + */ + void send(Message message) throws HornetQException; + + /** + * Sends a message to the specified address instead of the ClientProducer's address.
+ *
+ * This message will be sent asynchronously. + *

+ * The handler will only get called if {@link ServerLocator#setConfirmationWindowSize(int) -1}. + * @param message the message to send + * @param handler handler to call after receiving a SEND acknowledgement from the server + * @throws HornetQException if an exception occurs while sending the message + */ + void send(Message message, SendAcknowledgementHandler handler) throws HornetQException; + + /** + * Sends a message to the specified address instead of the ClientProducer's address.
+ *
+ * This will block until confirmation that the message has reached the server has been received + * if {@link ServerLocator#setBlockOnDurableSend(boolean)} or + * {@link ServerLocator#setBlockOnNonDurableSend(boolean)} are set to true for the specified + * message type. + * @param address the address where the message will be sent + * @param message the message to send + * @throws HornetQException if an exception occurs while sending the message + */ + void send(SimpleString address, Message message) throws HornetQException; + + /** + * Sends a message to the specified address instead of the ClientProducer's address.
+ *
+ * This message will be sent asynchronously. + *

+ * The handler will only get called if {@link ServerLocator#setConfirmationWindowSize(int) -1}. + * @param address the address where the message will be sent + * @param message the message to send + * @param handler handler to call after receiving a SEND acknowledgement from the server + * @throws HornetQException if an exception occurs while sending the message + */ + void send(SimpleString address, Message message, SendAcknowledgementHandler handler) throws HornetQException; + + /** + * Sends a message to the specified address instead of the ClientProducer's address.
+ *
+ * This will block until confirmation that the message has reached the server has been received + * if {@link ServerLocator#setBlockOnDurableSend(boolean)} or + * {@link ServerLocator#setBlockOnNonDurableSend(boolean)} are set to true for the specified + * message type. + * @param address the address where the message will be sent + * @param message the message to send + * @throws HornetQException if an exception occurs while sending the message + */ + void send(String address, Message message) throws HornetQException; + + /** + * Closes the ClientProducer. If already closed nothing is done. + * + * @throws HornetQException if an exception occurs while closing the producer + */ + void close() throws HornetQException; + + /** + * Returns whether the producer is closed or not. + * + * @return true if the producer is closed, false else + */ + boolean isClosed(); + + /** + * Returns whether the producer will block when sending durable messages. + * + * @return true if the producer blocks when sending durable, false else + */ + boolean isBlockOnDurableSend(); + + /** + * Returns whether the producer will block when sending non-durable messages. + * + * @return true if the producer blocks when sending non-durable, false else + */ + boolean isBlockOnNonDurableSend(); + + /** + * Returns the maximum rate at which a ClientProducer can send messages per second. + * + * @return the producers maximum rate + */ + int getMaxRate(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientRequestor.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientRequestor.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientRequestor.java new file mode 100644 index 0000000..aabd39c --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientRequestor.java @@ -0,0 +1,110 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import java.util.UUID; + +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.client.impl.ClientMessageImpl; + +/** + * The ClientRequestor class helps making requests. + *
+ * The ClientRequestor constructor is given a ClientSession and a request address. + * It creates a temporary queue for the responses and provides a request method that sends the request message and waits for its reply. + * + * @apiviz.uses org.apache.activemq6.api.core.client.ClientSession + * @apiviz.owns org.apache.activemq6.api.core.client.ClientProducer + * @apiviz.owns org.apache.activemq6.api.core.client.ClientConsumer + * + * @author Jeff Mesnil + */ +public final class ClientRequestor +{ + private final ClientSession queueSession; + + private final ClientProducer requestProducer; + + private final ClientConsumer replyConsumer; + + private final SimpleString replyQueue; + + /** + * Constructor for the ClientRequestor. + * + * The implementation expects a ClientSession with automatic commits of sends and acknowledgements + * + * @param session a ClientSession uses to handle requests and replies + * @param requestAddress the address to send request messages to + * @throws Exception + */ + public ClientRequestor(final ClientSession session, final SimpleString requestAddress) throws Exception + { + queueSession = session; + + requestProducer = queueSession.createProducer(requestAddress); + replyQueue = new SimpleString(requestAddress + "." + UUID.randomUUID().toString()); + queueSession.createTemporaryQueue(replyQueue, replyQueue); + replyConsumer = queueSession.createConsumer(replyQueue); + } + + /** + * @see ClientRequestor#ClientRequestor(ClientSession, SimpleString) + */ + public ClientRequestor(final ClientSession session, final String requestAddress) throws Exception + { + this(session, SimpleString.toSimpleString(requestAddress)); + } + + /** + * Sends a message to the request address and wait indefinitely for a reply. + * The temporary queue is used for the REPLYTO_HEADER_NAME, and only one reply per request is expected + * + * @param request the message to send + * @return the reply message + * @throws Exception + */ + public ClientMessage request(final ClientMessage request) throws Exception + { + return request(request, 0); + } + + /** + * Sends a message to the request address and wait for the given timeout for a reply. + * The temporary queue is used for the REPLYTO_HEADER_NAME, and only one reply per request is expected + * + * @param request the message to send + * @param timeout the timeout to wait for a reply (in milliseconds) + * @return the reply message or {@code null} if no message is replied before the timeout elapses + * @throws Exception + */ + public ClientMessage request(final ClientMessage request, final long timeout) throws Exception + { + request.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, replyQueue); + requestProducer.send(request); + return replyConsumer.receive(timeout); + } + + /** + * Closes the ClientRequestor and its session. + * + * @throws Exception if an exception occurs while closing the ClientRequestor + */ + public void close() throws Exception + { + replyConsumer.close(); + requestProducer.close(); + queueSession.deleteQueue(replyQueue); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSession.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSession.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSession.java new file mode 100644 index 0000000..c9baf64 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSession.java @@ -0,0 +1,671 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import javax.transaction.xa.XAResource; +import java.util.List; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.SimpleString; + +/** + * A ClientSession is a single-thread object required for producing and consuming messages. + * + * @author Tim Fox + * @author Clebert Suconic + * @author Andy Taylor + * @author Jeff Mesnil + */ +public interface ClientSession extends XAResource, AutoCloseable +{ + /** + * Information returned by a binding query + * + * @see ClientSession#addressQuery(SimpleString) + */ + public interface AddressQuery + { + /** + * Returns true if the binding exists, false else. + */ + boolean isExists(); + + /** + * Returns the names of the queues bound to the binding. + */ + List getQueueNames(); + } + + /** + * @deprecated Use {@link org.apache.activemq6.api.core.client.ClientSession.AddressQuery} instead + */ + @Deprecated + public interface BindingQuery extends AddressQuery + { + + } + + /** + * Information returned by a queue query + * + * @see ClientSession#queueQuery(SimpleString) + */ + public interface QueueQuery + { + /** + * Returns true if the queue exists, false else. + */ + boolean isExists(); + + /** + * Return true if the queue is temporary, false else. + */ + boolean isTemporary(); + + /** + * Returns true if the queue is durable, false else. + */ + boolean isDurable(); + + /** + * Returns the number of consumers attached to the queue. + */ + int getConsumerCount(); + + /** + * Returns the number of messages in the queue. + */ + long getMessageCount(); + + /** + * Returns the queue's filter string (or {@code null} if the queue has no filter). + */ + SimpleString getFilterString(); + + /** + * Returns the address that the queue is bound to. + */ + SimpleString getAddress(); + + /** + * Return the name of the queue + * + * @return + */ + SimpleString getName(); + } + + // Lifecycle operations ------------------------------------------ + + /** + * Starts the session. + * The session must be started before ClientConsumers created by the session can consume messages from the queue. + * + * @throws HornetQException if an exception occurs while starting the session + */ + ClientSession start() throws HornetQException; + + /** + * Stops the session. + * ClientConsumers created by the session can not consume messages when the session is stopped. + * + * @throws HornetQException if an exception occurs while stopping the session + */ + void stop() throws HornetQException; + + /** + * Closes the session. + * + * @throws HornetQException if an exception occurs while closing the session + */ + void close() throws HornetQException; + + /** + * Returns whether the session is closed or not. + * + * @return true if the session is closed, false else + */ + boolean isClosed(); + + /** + * Adds a FailureListener to the session which is notified if a failure occurs on the session. + * + * @param listener the listener to add + */ + void addFailureListener(SessionFailureListener listener); + + /** + * Removes a FailureListener to the session. + * + * @param listener the listener to remove + * @return true if the listener was removed, false else + */ + boolean removeFailureListener(SessionFailureListener listener); + + /** + * Adds a FailoverEventListener to the session which is notified if a failover event occurs on the session. + * + * @param listener the listener to add + */ + void addFailoverListener(FailoverEventListener listener); + + /** + * Removes a FailoverEventListener to the session. + * + * @param listener the listener to remove + * @return true if the listener was removed, false else + */ + boolean removeFailoverListener(FailoverEventListener listener); + + + /** + * Returns the server's incrementingVersion. + * + * @return the server's incrementingVersion + */ + int getVersion(); + + // Queue Operations ---------------------------------------------- + + /** + * Creates a non-temporary queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, SimpleString queueName, boolean durable) throws HornetQException; + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + *

+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param durable if the queue is durable + * @throws HornetQException in an exception occurs while creating the queue + */ + void createSharedQueue(SimpleString address, SimpleString queueName, boolean durable) throws HornetQException; + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. When the last consumer is closed the queue will be deleted + *

+ * Notice: you will get an exception if the address or the filter doesn't match to an already existent queue + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter whether the queue is durable or not + * @param durable if the queue is durable + * @throws HornetQException in an exception occurs while creating the queue + */ + void createSharedQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable) throws HornetQException; + + /** + * Creates a non-temporary queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(String address, String queueName, boolean durable) throws HornetQException; + + /** + * Creates a non-temporary queue non-durable queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(String address, String queueName) throws HornetQException; + + /** + * Creates a non-temporary queue non-durable queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, SimpleString queueName) throws HornetQException; + + /** + * Creates a non-temporary queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @param durable whether the queue is durable or not + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable) throws HornetQException; + + /** + * Creates a non-temporaryqueue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param durable whether the queue is durable or not + * @param filter only messages which match this filter will be put in the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createQueue(String address, String queueName, String filter, boolean durable) throws HornetQException; + + /** + * Creates a temporary queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(SimpleString address, SimpleString queueName) throws HornetQException; + + /** + * Creates a temporary queue. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(String address, String queueName) throws HornetQException; + + /** + * Creates a temporary queue with a filter. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(SimpleString address, SimpleString queueName, SimpleString filter) throws HornetQException; + + /** + * Creates a temporary queue with a filter. + * + * @param address the queue will be bound to this address + * @param queueName the name of the queue + * @param filter only messages which match this filter will be put in the queue + * @throws HornetQException in an exception occurs while creating the queue + */ + void createTemporaryQueue(String address, String queueName, String filter) throws HornetQException; + + /** + * Deletes the queue. + * + * @param queueName the name of the queue to delete + * @throws HornetQException if there is no queue for the given name or if the queue has consumers + */ + void deleteQueue(SimpleString queueName) throws HornetQException; + + /** + * Deletes the queue. + * + * @param queueName the name of the queue to delete + * @throws HornetQException if there is no queue for the given name or if the queue has consumers + */ + void deleteQueue(String queueName) throws HornetQException; + + // Consumer Operations ------------------------------------------- + + /** + * Creates a ClientConsumer to consume message from the queue with the given name. + * + * @param queueName name of the queue to consume messages from + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName) throws HornetQException; + + /** + * Creates a ClientConsumer to consume messages from the queue with the given name. + * + * @param queueName name of the queue to consume messages from + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(String queueName) throws HornetQException; + + /** + * Creates a ClientConsumer to consume messages matching the filter from the queue with the given name. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, SimpleString filter) throws HornetQException; + + /** + * Creates a ClientConsumer to consume messages matching the filter from the queue with the given name. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(String queueName, String filter) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages from the queue with the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, boolean browseOnly) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages from the queue with the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(String queueName, boolean browseOnly) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(String queueName, String filter, boolean browseOnly) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, SimpleString filter, boolean browseOnly) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param windowSize the consumer window size + * @param maxRate the maximum rate to consume messages + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(SimpleString queueName, + SimpleString filter, + int windowSize, + int maxRate, + boolean browseOnly) throws HornetQException; + + /** + * Creates a ClientConsumer to consume or browse messages matching the filter from the queue with + * the given name. + *

+ * If browseOnly is true, the ClientConsumer will receive the messages + * from the queue but they will not be consumed (the messages will remain in the queue). Note + * that paged messages will not be in the queue, and will therefore not be visible if + * {@code browseOnly} is {@code true}. + *

+ * If browseOnly is false, the ClientConsumer will behave like consume + * the messages from the queue and the messages will effectively be removed from the queue. + * + * @param queueName name of the queue to consume messages from + * @param filter only messages which match this filter will be consumed + * @param windowSize the consumer window size + * @param maxRate the maximum rate to consume messages + * @param browseOnly whether the ClientConsumer will only browse the queue or consume messages. + * @return a ClientConsumer + * @throws HornetQException if an exception occurs while creating the ClientConsumer + */ + ClientConsumer createConsumer(String queueName, String filter, int windowSize, int maxRate, boolean browseOnly) throws HornetQException; + + // Producer Operations ------------------------------------------- + + /** + * Creates a producer with no default address. + * Address must be specified every time a message is sent + * + * @return a ClientProducer + * @see ClientProducer#send(SimpleString, org.apache.activemq6.api.core.Message) + */ + ClientProducer createProducer() throws HornetQException; + + /** + * Creates a producer which sends messages to the given address + * + * @param address the address to send messages to + * @return a ClientProducer + * @throws HornetQException if an exception occurs while creating the ClientProducer + */ + ClientProducer createProducer(SimpleString address) throws HornetQException; + + /** + * Creates a producer which sends messages to the given address + * + * @param address the address to send messages to + * @return a ClientProducer + * @throws HornetQException if an exception occurs while creating the ClientProducer + */ + ClientProducer createProducer(String address) throws HornetQException; + + /** + * Creates a producer which sends messages to the given address + * + * @param address the address to send messages to + * @param rate the producer rate + * @return a ClientProducer + * @throws HornetQException if an exception occurs while creating the ClientProducer + */ + ClientProducer createProducer(SimpleString address, int rate) throws HornetQException; + + // Message operations -------------------------------------------- + + /** + * Creates a ClientMessage. + * + * @param durable whether the created message is durable or not + * @return a ClientMessage + */ + ClientMessage createMessage(boolean durable); + + /** + * Creates a ClientMessage. + * + * @param type type of the message + * @param durable whether the created message is durable or not + * @return a ClientMessage + */ + ClientMessage createMessage(byte type, boolean durable); + + /** + * Creates a ClientMessage. + * + * @param type type of the message + * @param durable whether the created message is durable or not + * @param expiration the message expiration + * @param timestamp the message timestamp + * @param priority the message priority (between 0 and 9 inclusive) + * @return a ClientMessage + */ + ClientMessage createMessage(byte type, boolean durable, long expiration, long timestamp, byte priority); + + // Query operations ---------------------------------------------- + + /** + * Queries information on a queue. + * + * @param queueName the name of the queue to query + * @return a QueueQuery containing information on the given queue + * @throws HornetQException if an exception occurs while querying the queue + */ + QueueQuery queueQuery(SimpleString queueName) throws HornetQException; + + /** + * Queries information on a binding. + * + * @param address the address of the biding to query + * @return a AddressQuery containing information on the binding attached to the given address + * @throws HornetQException if an exception occurs while querying the binding + */ + AddressQuery addressQuery(SimpleString address) throws HornetQException; + + // Transaction operations ---------------------------------------- + + /** + * Returns the XAResource associated to the session. + * + * @return the XAResource associated to the session + */ + XAResource getXAResource(); + + /** + * Return true if the session supports XA, false else. + * + * @return true if the session supports XA, false else. + */ + boolean isXA(); + + /** + * Commits the current transaction. + * + * @throws HornetQException if an exception occurs while committing the transaction + */ + void commit() throws HornetQException; + + /** + * Rolls back the current transaction. + * + * @throws HornetQException if an exception occurs while rolling back the transaction + */ + void rollback() throws HornetQException; + + /** + * Rolls back the current transaction. + * + * @param considerLastMessageAsDelivered the first message on deliveringMessage Buffer is considered as delivered + * @throws HornetQException if an exception occurs while rolling back the transaction + */ + void rollback(boolean considerLastMessageAsDelivered) throws HornetQException; + + /** + * Returns true if the current transaction has been flagged to rollback, false else. + * + * @return true if the current transaction has been flagged to rollback, false else. + */ + boolean isRollbackOnly(); + + /** + * Returns whether the session will automatically commit its transaction every time a message is sent + * by a ClientProducer created by this session, false else + * + * @return true if the session automatically commit its transaction every time a message is sent, false else + */ + boolean isAutoCommitSends(); + + /** + * Returns whether the session will automatically commit its transaction every time a message is acknowledged + * by a ClientConsumer created by this session, false else + * + * @return true if the session automatically commit its transaction every time a message is acknowledged, false else + */ + boolean isAutoCommitAcks(); + + /** + * Returns whether the ClientConsumer created by the session will block when they acknowledge a message. + * + * @return true if the session's ClientConsumer block when they acknowledge a message, false else + */ + boolean isBlockOnAcknowledge(); + + /** + * Sets a SendAcknowledgementHandler for this session. + * + * @param handler a SendAcknowledgementHandler + * @return this ClientSession + */ + ClientSession setSendAcknowledgementHandler(SendAcknowledgementHandler handler); + + /** + * Attach any metadata to the session. + * + * @throws HornetQException + */ + void addMetaData(String key, String data) throws HornetQException; + + /** + * Attach any metadata to the session. Throws an exception if there's already a metadata available. + * You can use this metadata to ensure that there is no other session with the same meta-data you are passing as an argument. + * This is useful to simulate unique client-ids, where you may want to avoid multiple instances of your client application connected. + * + * @throws HornetQException + */ + void addUniqueMetaData(String key, String data) throws HornetQException; + + /** + * Return the sessionFactory used to created this Session. + * + * @return + */ + ClientSessionFactory getSessionFactory(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSessionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSessionFactory.java b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSessionFactory.java new file mode 100644 index 0000000..313bf72 --- /dev/null +++ b/activemq6-core-client/src/main/java/org/apache/activemq6/api/core/client/ClientSessionFactory.java @@ -0,0 +1,183 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.api.core.client; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; + + +/** + * A ClientSessionFactory is the entry point to create and configure HornetQ resources to produce and consume messages. + *
+ * It is possible to configure a factory using the setter methods only if no session has been created. + * Once a session is created, the configuration is fixed and any call to a setter method will throw a IllegalStateException. + * + * @author Tim Fox + */ +public interface ClientSessionFactory extends AutoCloseable +{ + /** + * Creates a session with XA transaction semantics. + * + * @return a ClientSession with XA transaction semantics + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createXASession() throws HornetQException; + + /** + * Creates a transacted session. + *

+ * It is up to the client to commit when sending and acknowledging messages. + * + * @return a transacted ClientSession + * @throws HornetQException if an exception occurs while creating the session + * @see ClientSession#commit() + */ + ClientSession createTransactedSession() throws HornetQException; + + + /** + * Creates a non-transacted session. + * Message sends and acknowledgements are automatically committed by the session. This does not + * mean that messages are automatically acknowledged, only that when messages are acknowledged, + * the session will automatically commit the transaction containing the acknowledgements. + * + * @return a non-transacted ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession() throws HornetQException; + + /** + * Creates a session. + * + * @param autoCommitSends true to automatically commit message sends, false to commit manually + * @param autoCommitAcks true to automatically commit message acknowledgement, false to commit manually + * @return a ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession(boolean autoCommitSends, boolean autoCommitAcks) throws HornetQException; + + /** + * Creates a session. + * + * @param autoCommitSends true to automatically commit message sends, false to commit manually + * @param autoCommitAcks true to automatically commit message acknowledgement, false to commit manually + * @param ackBatchSize the batch size of the acknowledgements + * @return a ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession(boolean autoCommitSends, boolean autoCommitAcks, int ackBatchSize) throws HornetQException; + + /** + * Creates a session. + * + * @param xa whether the session support XA transaction semantic or not + * @param autoCommitSends true to automatically commit message sends, false to commit manually + * @param autoCommitAcks true to automatically commit message acknowledgement, false to commit manually + * @return a ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks) throws HornetQException; + + /** + * Creates a session. + *

+ * It is possible to pre-acknowledge messages on the server so that the client can avoid additional network trip + * to the server to acknowledge messages. While this increase performance, this does not guarantee delivery (as messages + * can be lost after being pre-acknowledged on the server). Use with caution if your application design permits it. + * + * @param xa whether the session support XA transaction semantic or not + * @param autoCommitSends true to automatically commit message sends, false to commit manually + * @param autoCommitAcks true to automatically commit message acknowledgement, false to commit manually + * @param preAcknowledge true to pre-acknowledge messages on the server, false to let the client acknowledge the messages + * @return a ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge) throws HornetQException; + + /** + * Creates an authenticated session. + *

+ * It is possible to pre-acknowledge messages on the server so that the client can avoid additional network trip + * to the server to acknowledge messages. While this increase performance, this does not guarantee delivery (as messages + * can be lost after being pre-acknowledged on the server). Use with caution if your application design permits it. + * + * @param username the user name + * @param password the user password + * @param xa whether the session support XA transaction semantic or not + * @param autoCommitSends true to automatically commit message sends, false to commit manually + * @param autoCommitAcks true to automatically commit message acknowledgement, false to commit manually + * @param preAcknowledge true to pre-acknowledge messages on the server, false to let the client acknowledge the messages + * @return a ClientSession + * @throws HornetQException if an exception occurs while creating the session + */ + ClientSession createSession(String username, + String password, + boolean xa, + boolean autoCommitSends, + boolean autoCommitAcks, + boolean preAcknowledge, + int ackBatchSize) throws HornetQException; + + /** + * Closes this factory and any session created by it. + */ + void close(); + + /** + * @return {@code true} if the factory is closed, {@code false} otherwise. + */ + boolean isClosed(); + + /** + * Adds a FailoverEventListener to the session which is notified if a failover event occurs on the session. + * + * @param listener the listener to add + * @return this ClientSessionFactory + */ + ClientSessionFactory addFailoverListener(FailoverEventListener listener); + + /** + * Removes a FailoverEventListener to the session. + * + * @param listener the listener to remove + * @return true if the listener was removed, false else + */ + boolean removeFailoverListener(FailoverEventListener listener); + + /** + * Opposed to close, will call cleanup only on every created session and children objects. + */ + void cleanup(); + + /** + * @return the server locator associated with this session factory + */ + ServerLocator getServerLocator(); + + /** + * Returns the code connection used by this session factory. + * + * @return the core connection + */ + RemotingConnection getConnection(); + + + /** + * Return the configuration used + * @return + */ + TransportConfiguration getConnectorConfiguration(); + +}