activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r781177 [4/11] - in /activemq/sandbox/activemq-flow: activemq-bio/ activemq-bio/src/main/java/org/ activemq-bio/src/main/java/org/apache/ activemq-bio/src/main/java/org/apache/activemq/ activemq-bio/src/main/java/org/apache/activemq/transpo...
Date Tue, 02 Jun 2009 21:29:35 GMT
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageDispatch;
+
+/**
+ * For application servers, <CODE>Connection</CODE> objects provide a special
+ * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
+ * messages it is to consume are specified by a <CODE>Destination</CODE> and a
+ * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
+ * given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
+ * <p/>
+ * <P>
+ * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
+ * <CODE>ServerSession</CODE> from its pool, loads it with a single message,
+ * and starts it. As traffic picks up, messages can back up. If this happens, a
+ * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
+ * with more than one message. This reduces the thread context switches and
+ * minimizes resource use at the expense of some serialization of message
+ * processing.
+ * 
+ * @see javax.jms.Connection#createConnectionConsumer
+ * @see javax.jms.Connection#createDurableConnectionConsumer
+ * @see javax.jms.QueueConnection#createConnectionConsumer
+ * @see javax.jms.TopicConnection#createConnectionConsumer
+ * @see javax.jms.TopicConnection#createDurableConnectionConsumer
+ */
+
+public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher {
+
+    private ActiveMQConnection connection;
+    private ServerSessionPool sessionPool;
+    private ConsumerInfo consumerInfo;
+    private boolean closed;
+
+    /**
+     * Create a ConnectionConsumer
+     * 
+     * @param theConnection
+     * @param theSessionPool
+     * @param theConsumerInfo
+     * @throws JMSException
+     */
+    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException {
+        this.connection = theConnection;
+        this.sessionPool = theSessionPool;
+        this.consumerInfo = theConsumerInfo;
+
+        this.connection.addConnectionConsumer(this);
+        this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
+        this.connection.asyncSendPacket(this.consumerInfo);
+    }
+
+    /**
+     * Gets the server session pool associated with this connection consumer.
+     * 
+     * @return the server session pool used by this connection consumer
+     * @throws JMSException if the JMS provider fails to get the server session
+     *                 pool associated with this consumer due to some internal
+     *                 error.
+     */
+
+    public ServerSessionPool getServerSessionPool() throws JMSException {
+        if (closed) {
+            throw new IllegalStateException("The Connection Consumer is closed");
+        }
+        return this.sessionPool;
+    }
+
+    /**
+     * Closes the connection consumer. <p/>
+     * <P>
+     * Since a provider may allocate some resources on behalf of a connection
+     * consumer outside the Java virtual machine, clients should close these
+     * resources when they are not needed. Relying on garbage collection to
+     * eventually reclaim these resources may not be timely enough.
+     * 
+     * @throws JMSException
+     */
+
+    public void close() throws JMSException {
+        if (!closed) {
+            dispose();
+            this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand());
+        }
+
+    }
+
+    public void dispose() {
+        if (!closed) {
+            this.connection.removeDispatcher(consumerInfo.getConsumerId());
+            this.connection.removeConnectionConsumer(this);
+            closed = true;
+        }
+    }
+
+    public void dispatch(MessageDispatch messageDispatch) {
+        try {
+            messageDispatch.setConsumer(this);
+
+            ServerSession serverSession = sessionPool.getServerSession();
+            Session s = serverSession.getSession();
+            ActiveMQSession session = null;
+
+            if (s instanceof ActiveMQSession) {
+                session = (ActiveMQSession)s;
+            } else if (s instanceof ActiveMQTopicSession) {
+                ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s;
+                session = (ActiveMQSession)topicSession.getNext();
+            } else if (s instanceof ActiveMQQueueSession) {
+                ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s;
+                session = (ActiveMQSession)queueSession.getNext();
+            } else {
+                connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass()));
+                return;
+            }
+
+            session.dispatch(messageDispatch);
+            serverSession.start();
+        } catch (JMSException e) {
+            connection.onAsyncException(e);
+        }
+    }
+
+    public String toString() {
+        return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }";
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionConsumer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,907 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.Context;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
+import org.apache.activemq.jndi.JNDIBaseStorable;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.management.StatsCapable;
+import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.URISupport.CompositeData;
+
+/**
+ * A ConnectionFactory is an an Administered object, and is used for creating
+ * Connections. <p/> This class also implements QueueConnectionFactory and
+ * TopicConnectionFactory. You can use this connection to create both
+ * QueueConnections and TopicConnections.
+ * 
+ * @org.apache.xbean.XBean element="connectionFactory"
+ * @version $Revision: 1.9 $
+ * @see javax.jms.ConnectionFactory
+ */
+public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
+
+    public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
+    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
+    public static final String DEFAULT_USER = null;
+    public static final String DEFAULT_PASSWORD = null;
+    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
+
+    protected static final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+        public Thread newThread(Runnable run) {
+            Thread thread = new Thread(run);
+            thread.setPriority(ThreadPriorities.INBOUND_CLIENT_CONNECTION);
+            return thread;
+        }
+    });
+
+    protected URI brokerURL;
+    protected String userName;
+    protected String password;
+    protected String clientID;
+    protected boolean dispatchAsync=true;
+    protected boolean alwaysSessionAsync=true;
+
+    JMSStatsImpl factoryStats = new JMSStatsImpl();
+
+    private IdGenerator clientIdGenerator;
+    private String clientIDPrefix;
+
+    // client policies
+    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
+    private MessageTransformer transformer;
+
+    private boolean disableTimeStampsByDefault;
+    private boolean optimizedMessageDispatch = true;
+    private boolean copyMessageOnSend = true;
+    private boolean useCompression;
+    private boolean objectMessageSerializationDefered;
+    private boolean useAsyncSend;
+    private boolean optimizeAcknowledge;
+    private int closeTimeout = 15000;
+    private boolean useRetroactiveConsumer;
+    private boolean exclusiveConsumer;
+    private boolean nestedMapAndListEnabled = true;
+    private boolean alwaysSyncSend;
+    private boolean watchTopicAdvisories = true;
+    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
+    private long warnAboutUnstartedConnectionTimeout = 500L;
+    private int sendTimeout =0;
+    private boolean sendAcksAsync=true;
+    private TransportListener transportListener;
+	private ExceptionListener exceptionListener;
+	private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+	private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+
+    // /////////////////////////////////////////////
+    //
+    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
+    //
+    // /////////////////////////////////////////////
+
+    public ActiveMQConnectionFactory() {
+        this(DEFAULT_BROKER_URL);
+    }
+
+    public ActiveMQConnectionFactory(String brokerURL) {
+        this(createURI(brokerURL));
+    }
+
+    public ActiveMQConnectionFactory(URI brokerURL) {
+        setBrokerURL(brokerURL.toString());
+    }
+
+    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
+        setUserName(userName);
+        setPassword(password);
+        setBrokerURL(brokerURL.toString());
+    }
+
+    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
+        setUserName(userName);
+        setPassword(password);
+        setBrokerURL(brokerURL);
+    }
+
+    /**
+     * Returns a copy of the given connection factory
+     */
+    public ActiveMQConnectionFactory copy() {
+        try {
+            return (ActiveMQConnectionFactory)super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("This should never happen: " + e, e);
+        }
+    }
+
+    /**
+     * @param brokerURL
+     * @return
+     * @throws URISyntaxException
+     */
+    private static URI createURI(String brokerURL) {
+        try {
+            return new URI(brokerURL);
+        } catch (URISyntaxException e) {
+            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
+        }
+    }
+
+    /**
+     * @return Returns the Connection.
+     */
+    public Connection createConnection() throws JMSException {
+        return createActiveMQConnection();
+    }
+
+    /**
+     * @return Returns the Connection.
+     */
+    public Connection createConnection(String userName, String password) throws JMSException {
+        return createActiveMQConnection(userName, password);
+    }
+
+    /**
+     * @return Returns the QueueConnection.
+     * @throws JMSException
+     */
+    public QueueConnection createQueueConnection() throws JMSException {
+        return createActiveMQConnection();
+    }
+
+    /**
+     * @return Returns the QueueConnection.
+     */
+    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
+        return createActiveMQConnection(userName, password);
+    }
+
+    /**
+     * @return Returns the TopicConnection.
+     * @throws JMSException
+     */
+    public TopicConnection createTopicConnection() throws JMSException {
+        return createActiveMQConnection();
+    }
+
+    /**
+     * @return Returns the TopicConnection.
+     */
+    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
+        return createActiveMQConnection(userName, password);
+    }
+
+    public StatsImpl getStats() {
+        // TODO
+        return null;
+    }
+
+    // /////////////////////////////////////////////
+    //
+    // Implementation methods.
+    //
+    // /////////////////////////////////////////////
+
+    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
+        return createActiveMQConnection(userName, password);
+    }
+
+    /**
+     * Creates a Transport based on this object's connection settings. Separated
+     * from createActiveMQConnection to allow for subclasses to override.
+     * 
+     * @return The newly created Transport.
+     * @throws JMSException If unable to create trasnport.
+     * @author sepandm@gmail.com
+     */
+    protected Transport createTransport() throws JMSException {
+        try {
+            return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);
+        } catch (Exception e) {
+            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
+        }
+    }
+
+    /**
+     * @return Returns the Connection.
+     */
+    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
+        if (brokerURL == null) {
+            throw new ConfigurationException("brokerURL not set.");
+        }
+        ActiveMQConnection connection = null;
+        try {
+            Transport transport = createTransport();
+            connection = createActiveMQConnection(transport, factoryStats);
+
+            connection.setUserName(userName);
+            connection.setPassword(password);
+
+            configureConnection(connection);
+
+            transport.start();
+
+            if (clientID != null) {
+                connection.setDefaultClientID(clientID);
+            }
+
+            return connection;
+        } catch (JMSException e) {
+            // Clean up!
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+            throw e;
+        } catch (Exception e) {
+            // Clean up!
+            try {
+                connection.close();
+            } catch (Throwable ignore) {
+            }
+            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
+        }
+    }
+
+    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
+        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), stats);
+        return connection;
+    }
+
+    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
+        connection.setPrefetchPolicy(getPrefetchPolicy());
+        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
+        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
+        connection.setCopyMessageOnSend(isCopyMessageOnSend());
+        connection.setUseCompression(isUseCompression());
+        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
+        connection.setDispatchAsync(isDispatchAsync());
+        connection.setUseAsyncSend(isUseAsyncSend());
+        connection.setAlwaysSyncSend(isAlwaysSyncSend());
+        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
+        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
+        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
+        connection.setExclusiveConsumer(isExclusiveConsumer());
+        connection.setRedeliveryPolicy(getRedeliveryPolicy());
+        connection.setTransformer(getTransformer());
+        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
+        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
+        connection.setProducerWindowSize(getProducerWindowSize());
+        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
+        connection.setSendTimeout(getSendTimeout());
+        connection.setSendAcksAsync(isSendAcksAsync());
+        connection.setAuditDepth(getAuditDepth());
+        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
+        if (transportListener != null) {
+            connection.addTransportListener(transportListener);
+        }
+        if (exceptionListener != null) {
+        	connection.setExceptionListener(exceptionListener);
+        }
+    }
+
+    // /////////////////////////////////////////////
+    //
+    // Property Accessors
+    //
+    // /////////////////////////////////////////////
+
+    public String getBrokerURL() {
+        return brokerURL == null ? null : brokerURL.toString();
+    }
+
+    /**
+     * Sets the <a
+     * href="http://activemq.apache.org/configuring-transports.html">connection
+     * URL</a> used to connect to the ActiveMQ broker.
+     */
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = createURI(brokerURL);
+
+        // Use all the properties prefixed with 'jms.' to set the connection
+        // factory
+        // options.
+        if (this.brokerURL.getQuery() != null) {
+            // It might be a standard URI or...
+            try {
+
+                Map map = URISupport.parseQuery(this.brokerURL.getQuery());
+                if (buildFromMap(IntrospectionSupport.extractProperties(map, "jms."))) {
+                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
+                }
+
+            } catch (URISyntaxException e) {
+            }
+
+        } else {
+
+            // It might be a composite URI.
+            try {
+                CompositeData data = URISupport.parseComposite(this.brokerURL);
+                if (buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms."))) {
+                    this.brokerURL = data.toURI();
+                }
+            } catch (URISyntaxException e) {
+            }
+        }
+    }
+
+    public String getClientID() {
+        return clientID;
+    }
+
+    /**
+     * Sets the JMS clientID to use for the created connection. Note that this
+     * can only be used by one connection at once so generally its a better idea
+     * to set the clientID on a Connection
+     */
+    public void setClientID(String clientID) {
+        this.clientID = clientID;
+    }
+
+    public boolean isCopyMessageOnSend() {
+        return copyMessageOnSend;
+    }
+
+    /**
+     * Should a JMS message be copied to a new JMS Message object as part of the
+     * send() method in JMS. This is enabled by default to be compliant with the
+     * JMS specification. You can disable it if you do not mutate JMS messages
+     * after they are sent for a performance boost
+     */
+    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
+        this.copyMessageOnSend = copyMessageOnSend;
+    }
+
+    public boolean isDisableTimeStampsByDefault() {
+        return disableTimeStampsByDefault;
+    }
+
+    /**
+     * Sets whether or not timestamps on messages should be disabled or not. If
+     * you disable them it adds a small performance boost.
+     */
+    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
+        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
+    }
+
+    public boolean isOptimizedMessageDispatch() {
+        return optimizedMessageDispatch;
+    }
+
+    /**
+     * If this flag is set then an larger prefetch limit is used - only
+     * applicable for durable topic subscribers.
+     */
+    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
+        this.optimizedMessageDispatch = optimizedMessageDispatch;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * Sets the JMS password used for connections created from this factory
+     */
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
+        return prefetchPolicy;
+    }
+
+    /**
+     * Sets the <a
+     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
+     * policy</a> for consumers created by this connection.
+     */
+    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
+        this.prefetchPolicy = prefetchPolicy;
+    }
+
+    public boolean isUseAsyncSend() {
+        return useAsyncSend;
+    }
+
+    public BlobTransferPolicy getBlobTransferPolicy() {
+        return blobTransferPolicy;
+    }
+
+    /**
+     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
+     * OBjects) are transferred from producers to brokers to consumers
+     */
+    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
+        this.blobTransferPolicy = blobTransferPolicy;
+    }
+
+    /**
+     * Forces the use of <a
+     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
+     * adds a massive performance boost; but means that the send() method will
+     * return immediately whether the message has been sent or not which could
+     * lead to message loss.
+     */
+    public void setUseAsyncSend(boolean useAsyncSend) {
+        this.useAsyncSend = useAsyncSend;
+    }
+
+    public synchronized boolean isWatchTopicAdvisories() {
+        return watchTopicAdvisories;
+    }
+
+    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
+        this.watchTopicAdvisories = watchTopicAdvisories;
+    }
+
+    /**
+     * @return true if always sync send messages
+     */
+    public boolean isAlwaysSyncSend() {
+        return this.alwaysSyncSend;
+    }
+
+    /**
+     * Set true if always require messages to be sync sent
+     * 
+     * @param alwaysSyncSend
+     */
+    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+        this.alwaysSyncSend = alwaysSyncSend;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    /**
+     * Sets the JMS userName used by connections created by this factory
+     */
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public boolean isUseRetroactiveConsumer() {
+        return useRetroactiveConsumer;
+    }
+
+    /**
+     * Sets whether or not retroactive consumers are enabled. Retroactive
+     * consumers allow non-durable topic subscribers to receive old messages
+     * that were published before the non-durable subscriber started.
+     */
+    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
+        this.useRetroactiveConsumer = useRetroactiveConsumer;
+    }
+
+    public boolean isExclusiveConsumer() {
+        return exclusiveConsumer;
+    }
+
+    /**
+     * Enables or disables whether or not queue consumers should be exclusive or
+     * not for example to preserve ordering when not using <a
+     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
+     * 
+     * @param exclusiveConsumer
+     */
+    public void setExclusiveConsumer(boolean exclusiveConsumer) {
+        this.exclusiveConsumer = exclusiveConsumer;
+    }
+
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the global redelivery policy to be used when a message is delivered
+     * but the session is rolled back
+     */
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+    
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+    
+    /**
+     * @return the sendAcksAsync
+     */
+    public boolean isSendAcksAsync() {
+        return sendAcksAsync;
+    }
+
+    /**
+     * @param sendAcksAsync the sendAcksAsync to set
+     */
+    public void setSendAcksAsync(boolean sendAcksAsync) {
+        this.sendAcksAsync = sendAcksAsync;
+    }
+
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on
+     * to the JMS bus or when they are received from the bus but before they are
+     * delivered to the JMS client
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+    public void buildFromProperties(Properties properties) {
+
+        if (properties == null) {
+            properties = new Properties();
+        }
+
+        String temp = properties.getProperty(Context.PROVIDER_URL);
+        if (temp == null || temp.length() == 0) {
+            temp = properties.getProperty("brokerURL");
+        }
+        if (temp != null && temp.length() > 0) {
+            setBrokerURL(temp);
+        }
+
+        Map<String, Object> p = new HashMap(properties);
+        buildFromMap(p);
+    }
+
+    public boolean buildFromMap(Map<String, Object> properties) {
+        boolean rc = false;
+
+        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
+        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
+            setPrefetchPolicy(p);
+            rc = true;
+        }
+
+        RedeliveryPolicy rp = new RedeliveryPolicy();
+        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
+            setRedeliveryPolicy(rp);
+            rc = true;
+        }
+
+        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
+        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
+            setBlobTransferPolicy(blobTransferPolicy);
+            rc = true;
+        }
+
+        rc |= IntrospectionSupport.setProperties(this, properties);
+
+        return rc;
+    }
+
+    public void populateProperties(Properties props) {
+        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
+
+        if (getBrokerURL() != null) {
+            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
+            props.setProperty("brokerURL", getBrokerURL());
+        }
+
+        if (getClientID() != null) {
+            props.setProperty("clientID", getClientID());
+        }
+
+        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
+        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
+        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
+
+        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
+        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
+        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
+        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
+
+        if (getPassword() != null) {
+            props.setProperty("password", getPassword());
+        }
+
+        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
+        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
+        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
+        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
+
+        if (getUserName() != null) {
+            props.setProperty("userName", getUserName());
+        }
+
+        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
+        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
+        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
+        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
+        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
+        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
+        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
+        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
+        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
+        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
+    }
+
+    public boolean isUseCompression() {
+        return useCompression;
+    }
+
+    /**
+     * Enables the use of compression of the message bodies
+     */
+    public void setUseCompression(boolean useCompression) {
+        this.useCompression = useCompression;
+    }
+
+    public boolean isObjectMessageSerializationDefered() {
+        return objectMessageSerializationDefered;
+    }
+
+    /**
+     * When an object is set on an ObjectMessage, the JMS spec requires the
+     * object to be serialized by that set method. Enabling this flag causes the
+     * object to not get serialized. The object may subsequently get serialized
+     * if the message needs to be sent over a socket or stored to disk.
+     */
+    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
+        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
+    }
+
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    /**
+     * Enables or disables the default setting of whether or not consumers have
+     * their messages <a
+     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
+     * synchronously or asynchronously by the broker</a>. For non-durable
+     * topics for example we typically dispatch synchronously by default to
+     * minimize context switches which boost performance. However sometimes its
+     * better to go slower to ensure that a single blocked consumer socket does
+     * not block delivery to other consumers.
+     * 
+     * @param asyncDispatch If true then consumers created on this connection
+     *                will default to having their messages dispatched
+     *                asynchronously. The default value is false.
+     */
+    public void setDispatchAsync(boolean asyncDispatch) {
+        this.dispatchAsync = asyncDispatch;
+    }
+
+    /**
+     * @return Returns the closeTimeout.
+     */
+    public int getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    /**
+     * Sets the timeout before a close is considered complete. Normally a
+     * close() on a connection waits for confirmation from the broker; this
+     * allows that operation to timeout to save the client hanging if there is
+     * no broker
+     */
+    public void setCloseTimeout(int closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
+
+    /**
+     * @return Returns the alwaysSessionAsync.
+     */
+    public boolean isAlwaysSessionAsync() {
+        return alwaysSessionAsync;
+    }
+
+    /**
+     * If this flag is set then a separate thread is not used for dispatching
+     * messages for each Session in the Connection. However, a separate thread
+     * is always used if there is more than one session, or the session isn't in
+     * auto acknowledge or duplicates ok mode
+     */
+    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
+        this.alwaysSessionAsync = alwaysSessionAsync;
+    }
+
+    /**
+     * @return Returns the optimizeAcknowledge.
+     */
+    public boolean isOptimizeAcknowledge() {
+        return optimizeAcknowledge;
+    }
+
+    /**
+     * @param optimizeAcknowledge The optimizeAcknowledge to set.
+     */
+    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
+        this.optimizeAcknowledge = optimizeAcknowledge;
+    }
+
+    public boolean isNestedMapAndListEnabled() {
+        return nestedMapAndListEnabled;
+    }
+
+    /**
+     * Enables/disables whether or not Message properties and MapMessage entries
+     * support <a
+     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
+     * Structures</a> of Map and List objects
+     */
+    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
+        this.nestedMapAndListEnabled = structuredMapsEnabled;
+    }
+
+    public String getClientIDPrefix() {
+        return clientIDPrefix;
+    }
+
+    /**
+     * Sets the prefix used by autogenerated JMS Client ID values which are used
+     * if the JMS client does not explicitly specify on.
+     * 
+     * @param clientIDPrefix
+     */
+    public void setClientIDPrefix(String clientIDPrefix) {
+        this.clientIDPrefix = clientIDPrefix;
+    }
+
+    protected synchronized IdGenerator getClientIdGenerator() {
+        if (clientIdGenerator == null) {
+            if (clientIDPrefix != null) {
+                clientIdGenerator = new IdGenerator(clientIDPrefix);
+            } else {
+                clientIdGenerator = new IdGenerator();
+            }
+        }
+        return clientIdGenerator;
+    }
+
+    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
+        this.clientIdGenerator = clientIdGenerator;
+    }
+
+    /**
+     * @return the statsEnabled
+     */
+    public boolean isStatsEnabled() {
+        return this.factoryStats.isEnabled();
+    }
+
+    /**
+     * @param statsEnabled the statsEnabled to set
+     */
+    public void setStatsEnabled(boolean statsEnabled) {
+        this.factoryStats.setEnabled(statsEnabled);
+    }
+
+    public synchronized int getProducerWindowSize() {
+        return producerWindowSize;
+    }
+
+    public synchronized void setProducerWindowSize(int producerWindowSize) {
+        this.producerWindowSize = producerWindowSize;
+    }
+
+    public long getWarnAboutUnstartedConnectionTimeout() {
+        return warnAboutUnstartedConnectionTimeout;
+    }
+
+    /**
+     * Enables the timeout from a connection creation to when a warning is
+     * generated if the connection is not properly started via
+     * {@link Connection#start()} and a message is received by a consumer. It is
+     * a very common gotcha to forget to <a
+     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
+     * the connection</a> so this option makes the default case to create a
+     * warning if the user forgets. To disable the warning just set the value to <
+     * 0 (say -1).
+     */
+    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
+        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
+    }
+
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
+
+    /**
+     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
+     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
+     * a transport listener.
+     *
+     * @param transportListener sets the listener to be registered on all connections
+     * created by this factory
+     */
+    public void setTransportListener(TransportListener transportListener) {
+        this.transportListener = transportListener;
+    }
+    
+    
+    public ExceptionListener getExceptionListener() {
+        return exceptionListener;
+    }
+    
+    /**
+     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
+     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
+     * an exception listener.
+     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
+     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
+     * @param exceptionListener sets the exception listener to be registered on all connections
+     * created by this factory
+     */
+    public void setExceptionListener(ExceptionListener exceptionListener) {
+    	this.exceptionListener = exceptionListener;
+    }
+
+	public int getAuditDepth() {
+		return auditDepth;
+	}
+
+	public void setAuditDepth(int auditDepth) {
+		this.auditDepth = auditDepth;
+	}
+
+	public int getAuditMaximumProducerNumber() {
+		return auditMaximumProducerNumber;
+	}
+
+	public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+		this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+	}
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Enumeration;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.ConnectionMetaData;
+
+/**
+ * A <CODE>ConnectionMetaData</CODE> object provides information describing
+ * the <CODE>Connection</CODE> object.
+ */
+
+public final class ActiveMQConnectionMetaData implements ConnectionMetaData {
+
+    public static final String PROVIDER_VERSION;
+    public static final int PROVIDER_MAJOR_VERSION;
+    public static final int PROVIDER_MINOR_VERSION;
+
+    public static final ActiveMQConnectionMetaData INSTANCE = new ActiveMQConnectionMetaData();
+
+    static {
+        String version = null;
+        int major = 0;
+        int minor = 0;
+        try {
+            Package p = Package.getPackage("org.apache.activemq");
+            if (p != null) {
+                version = p.getImplementationVersion();
+                Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+                Matcher m = pattern.matcher(version);
+                if (m.matches()) {
+                    major = Integer.parseInt(m.group(1));
+                    minor = Integer.parseInt(m.group(2));
+                }
+            }
+        } catch (Throwable e) {
+        }
+        PROVIDER_VERSION = version;
+        PROVIDER_MAJOR_VERSION = major;
+        PROVIDER_MINOR_VERSION = minor;
+    }
+
+    private ActiveMQConnectionMetaData() {
+    }
+
+    /**
+     * Gets the JMS API version.
+     * 
+     * @return the JMS API version
+     */
+
+    public String getJMSVersion() {
+        return "1.1";
+    }
+
+    /**
+     * Gets the JMS major version number.
+     * 
+     * @return the JMS API major version number
+     */
+
+    public int getJMSMajorVersion() {
+        return 1;
+    }
+
+    /**
+     * Gets the JMS minor version number.
+     * 
+     * @return the JMS API minor version number
+     */
+
+    public int getJMSMinorVersion() {
+        return 1;
+    }
+
+    /**
+     * Gets the JMS provider name.
+     * 
+     * @return the JMS provider name
+     */
+
+    public String getJMSProviderName() {
+        return "ActiveMQ";
+    }
+
+    /**
+     * Gets the JMS provider version.
+     * 
+     * @return the JMS provider version
+     */
+
+    public String getProviderVersion() {
+        return PROVIDER_VERSION;
+    }
+
+    /**
+     * Gets the JMS provider major version number.
+     * 
+     * @return the JMS provider major version number
+     */
+
+    public int getProviderMajorVersion() {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    /**
+     * Gets the JMS provider minor version number.
+     * 
+     * @return the JMS provider minor version number
+     */
+
+    public int getProviderMinorVersion() {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    /**
+     * Gets an enumeration of the JMSX property names.
+     * 
+     * @return an Enumeration of JMSX property names
+     */
+
+    public Enumeration<String> getJMSXPropertyNames() {
+        Vector<String> jmxProperties = new Vector<String>();
+        jmxProperties.add("JMSXGroupID");
+        jmxProperties.add("JMSXGroupSeq");
+        jmxProperties.add("JMSXDeliveryCount");
+        jmxProperties.add("JMSXProducerTXID");
+        return jmxProperties.elements();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionMetaData.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.command.MessageDispatch;
+
+public interface ActiveMQDispatcher {
+    void dispatch(MessageDispatch messageDispatch);
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQDispatcher.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQInputStream.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.filter.FilterException;
+import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMSExceptionSupport;
+
+/**
+ * @version $Revision$
+ */
+public class ActiveMQInputStream extends InputStream implements ActiveMQDispatcher {
+
+    private final ActiveMQConnection connection;
+    private final ConsumerInfo info;
+    // These are the messages waiting to be delivered to the client
+    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+
+    private int deliveredCounter;
+    private MessageDispatch lastDelivered;
+    private boolean eosReached;
+    private byte buffer[];
+    private int pos;
+
+    private ProducerId producerId;
+    private long nextSequenceId;
+
+    public ActiveMQInputStream(ActiveMQConnection connection, ConsumerId consumerId, ActiveMQDestination dest, String selector, boolean noLocal, String name, int prefetch)
+        throws JMSException {
+        this.connection = connection;
+
+        if (dest == null) {
+            throw new InvalidDestinationException("Don't understand null destinations");
+        } else if (dest.isTemporary()) {
+            String physicalName = dest.getPhysicalName();
+
+            if (physicalName == null) {
+                throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
+            }
+
+            String connectionID = connection.getConnectionInfo().getConnectionId().getValue();
+
+            if (physicalName.indexOf(connectionID) < 0) {
+                throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
+            }
+
+            if (connection.isDeleted(dest)) {
+                throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
+            }
+        }
+
+        this.info = new ConsumerInfo(consumerId);
+        this.info.setSubscriptionName(name);
+
+        if (selector != null && selector.trim().length() != 0) {
+            selector = "JMSType='org.apache.activemq.Stream' AND ( " + selector + " ) ";
+        } else {
+            selector = "JMSType='org.apache.activemq.Stream'";
+        }
+
+        try {
+            SelectorParser.parse(selector);
+        } catch (FilterException e) {
+            throw JMSExceptionSupport.createInvalidSelectorException(e);
+        }
+        this.info.setSelector(selector);
+
+        this.info.setPrefetchSize(prefetch);
+        this.info.setNoLocal(noLocal);
+        this.info.setBrowser(false);
+        this.info.setDispatchAsync(false);
+
+        // Allows the options on the destination to configure the consumerInfo
+        if (dest.getOptions() != null) {
+            Map<String, String> options = new HashMap<String, String>(dest.getOptions());
+            IntrospectionSupport.setProperties(this.info, options, "consumer.");
+        }
+
+        this.info.setDestination(dest);
+
+        this.connection.addInputStream(this);
+        this.connection.addDispatcher(info.getConsumerId(), this);
+        this.connection.syncSendPacket(info);
+        unconsumedMessages.start();
+    }
+
+    public void close() throws IOException {
+        if (!unconsumedMessages.isClosed()) {
+            try {
+                if (lastDelivered != null) {
+                    MessageAck ack = new MessageAck(lastDelivered, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
+                    connection.asyncSendPacket(ack);
+                }
+                dispose();
+                this.connection.syncSendPacket(info.createRemoveCommand());
+            } catch (JMSException e) {
+                throw IOExceptionSupport.create(e);
+            }
+        }
+    }
+
+    public void dispose() {
+        if (!unconsumedMessages.isClosed()) {
+            unconsumedMessages.close();
+            this.connection.removeDispatcher(info.getConsumerId());
+            this.connection.removeInputStream(this);
+        }
+    }
+
+    public ActiveMQMessage receive() throws JMSException {
+        checkClosed();
+        MessageDispatch md;
+        try {
+            md = unconsumedMessages.dequeue(-1);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw JMSExceptionSupport.create(e);
+        }
+
+        if (md == null || unconsumedMessages.isClosed() || md.getMessage().isExpired()) {
+            return null;
+        }
+
+        deliveredCounter++;
+        if ((0.75 * info.getPrefetchSize()) <= deliveredCounter) {
+            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
+            connection.asyncSendPacket(ack);
+            deliveredCounter = 0;
+            lastDelivered = null;
+        } else {
+            lastDelivered = md;
+        }
+
+        return (ActiveMQMessage)md.getMessage();
+    }
+
+    /**
+     * @throws IllegalStateException
+     */
+    protected void checkClosed() throws IllegalStateException {
+        if (unconsumedMessages.isClosed()) {
+            throw new IllegalStateException("The Consumer is closed");
+        }
+    }
+
+    public int read() throws IOException {
+        fillBuffer();
+        if (eosReached || buffer.length == 0) {
+            return -1;
+        }
+
+        return buffer[pos++] & 0xff;
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        fillBuffer();
+        if (eosReached || buffer.length == 0) {
+            return -1;
+        }
+
+        int max = Math.min(len, buffer.length - pos);
+        System.arraycopy(buffer, pos, b, off, max);
+
+        pos += max;
+        return max;
+    }
+
+    private void fillBuffer() throws IOException {
+        if (eosReached || (buffer != null && buffer.length > pos)) {
+            return;
+        }
+        try {
+            while (true) {
+                ActiveMQMessage m = receive();
+                if (m != null && m.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
+                    // First message.
+                    long producerSequenceId = m.getMessageId().getProducerSequenceId();
+                    if (producerId == null) {
+                        // We have to start a stream at sequence id = 0
+                        if (producerSequenceId != 0) {
+                            continue;
+                        }
+                        nextSequenceId++;
+                        producerId = m.getMessageId().getProducerId();
+                    } else {
+                        // Verify it's the next message of the sequence.
+                        if (!m.getMessageId().getProducerId().equals(producerId)) {
+                            throw new IOException("Received an unexpected message: invalid producer: " + m);
+                        }
+                        if (producerSequenceId != nextSequenceId++) {
+                            throw new IOException("Received an unexpected message: expected ID: " + (nextSequenceId - 1) + " but was: " + producerSequenceId + " for message: " + m);
+                        }
+                    }
+
+                    // Read the buffer in.
+                    ActiveMQBytesMessage bm = (ActiveMQBytesMessage)m;
+                    buffer = new byte[(int)bm.getBodyLength()];
+                    bm.readBytes(buffer);
+                    pos = 0;
+                } else {
+                    eosReached = true;
+                }
+                return;
+            }
+        } catch (JMSException e) {
+            eosReached = true;
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    public void dispatch(MessageDispatch md) {
+        unconsumedMessages.enqueue(md);
+    }
+
+    public String toString() {
+        return "ActiveMQInputStream { value=" + info.getConsumerId() + ", producerId=" + producerId + " }";
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=781177&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java Tue Jun  2 21:29:30 2009
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.BitArrayBin;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.util.LRUCache;
+
+/**
+ * Provides basic audit functions for Messages
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class ActiveMQMessageAudit {
+
+    public static final int DEFAULT_WINDOW_SIZE = 2048;
+    public static final int MAXIMUM_PRODUCER_COUNT = 64;
+    private int auditDepth;
+    private int maximumNumberOfProducersToTrack;
+    private LRUCache<Object, BitArrayBin> map;
+
+    /**
+     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
+     * 64
+     */
+    public ActiveMQMessageAudit() {
+        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
+    }
+
+    /**
+     * Construct a MessageAudit
+     * 
+     * @param auditDepth range of ids to track
+     * @param maximumNumberOfProducersToTrack number of producers expected in
+     *                the system
+     */
+    public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
+        this.auditDepth = auditDepth;
+        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
+        this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
+    }
+    
+    /**
+     * @return the auditDepth
+     */
+    public int getAuditDepth() {
+        return auditDepth;
+    }
+
+    /**
+     * @param auditDepth the auditDepth to set
+     */
+    public void setAuditDepth(int auditDepth) {
+        this.auditDepth = auditDepth;
+    }
+
+    /**
+     * @return the maximumNumberOfProducersToTrack
+     */
+    public int getMaximumNumberOfProducersToTrack() {
+        return maximumNumberOfProducersToTrack;
+    }
+
+    /**
+     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
+     */
+    public void setMaximumNumberOfProducersToTrack(
+            int maximumNumberOfProducersToTrack) {
+        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
+    }
+
+    /**
+     * Checks if this message has been seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     * @throws JMSException
+     */
+    public boolean isDuplicate(Message message) throws JMSException {
+        return isDuplicate(message.getJMSMessageID());
+    }
+
+    /**
+     * checks whether this messageId has been seen before and adds this
+     * messageId to the list
+     * 
+     * @param id
+     * @return true if the message is a duplicate
+     */
+    public synchronized boolean isDuplicate(String id) {
+        boolean answer = false;
+        String seed = IdGenerator.getSeedFromId(id);
+        if (seed != null) {
+            BitArrayBin bab = map.get(seed);
+            if (bab == null) {
+                bab = new BitArrayBin(auditDepth);
+                map.put(seed, bab);
+            }
+            long index = IdGenerator.getSequenceFromId(id);
+            if (index >= 0) {
+                answer = bab.setBit(index, true);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * Checks if this message has been seen before
+     * 
+     * @param message
+     * @return true if the message is a duplicate
+     */
+    public boolean isDuplicate(final MessageReference message) {
+        MessageId id = message.getMessageId();
+        return isDuplicate(id);
+    }
+    
+    /**
+     * Checks if this messageId has been seen before
+     * 
+     * @param id
+     * @return true if the message is a duplicate
+     */
+    public synchronized boolean isDuplicate(final MessageId id) {
+        boolean answer = false;
+        
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
+                }
+                answer = bab.setBit(id.getProducerSequenceId(), true);
+            }
+        }
+        return answer;
+    }
+
+    /**
+     * mark this message as being received
+     * 
+     * @param message
+     */
+    public void rollback(final MessageReference message) {
+        MessageId id = message.getMessageId();
+        rollback(id);
+    }
+    
+    /**
+     * mark this message as being received
+     * 
+     * @param id
+     */
+    public synchronized void rollback(final  MessageId id) {
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab != null) {
+                    bab.setBit(id.getProducerSequenceId(), false);
+                }
+            }
+        }
+    }
+    
+    /**
+     * Check the message is in order
+     * @param msg
+     * @return
+     * @throws JMSException
+     */
+    public boolean isInOrder(Message msg) throws JMSException {
+        return isInOrder(msg.getJMSMessageID());
+    }
+    
+    /**
+     * Check the message id is in order
+     * @param id
+     * @return
+     */
+    public synchronized boolean isInOrder(final String id) {
+        boolean answer = true;
+        
+        if (id != null) {
+            String seed = IdGenerator.getSeedFromId(id);
+            if (seed != null) {
+                BitArrayBin bab = map.get(seed);
+                if (bab != null) {
+                    long index = IdGenerator.getSequenceFromId(id);
+                    answer = bab.isInOrder(index);
+                }
+               
+            }
+        }
+        return answer;
+    }
+    
+    /**
+     * Check the MessageId is in order
+     * @param message 
+     * @return
+     */
+    public synchronized boolean isInOrder(final MessageReference message) {
+        return isInOrder(message.getMessageId());
+    }
+    
+    /**
+     * Check the MessageId is in order
+     * @param id
+     * @return
+     */
+    public synchronized boolean isInOrder(final MessageId id) {
+        boolean answer = false;
+
+        if (id != null) {
+            ProducerId pid = id.getProducerId();
+            if (pid != null) {
+                BitArrayBin bab = map.get(pid);
+                if (bab == null) {
+                    bab = new BitArrayBin(auditDepth);
+                    map.put(pid, bab);
+                }
+                answer = bab.isInOrder(id.getProducerSequenceId());
+
+            }
+        }
+        return answer;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message