activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r780773 [6/31] - in /activemq/sandbox/activemq-flow: activemq-client/ activemq-client/src/main/java/org/ activemq-client/src/main/java/org/apache/ activemq-client/src/main/java/org/apache/activemq/ activemq-client/src/main/java/org/apache/a...
Date Mon, 01 Jun 2009 18:37:54 GMT
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LastPartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LastPartialCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LastPartialCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LastPartialCommand.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents the end marker of a stream of {@link PartialCommand} instances.
+ * 
+ * @openwire:marshaller code="61"
+ * @version $Revision: 563921 $
+ */
+public class LastPartialCommand extends PartialCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND;
+
+    public LastPartialCommand() {
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this);
+    }
+
+    /**
+     * Lets copy across any transient fields from this command 
+     * to the complete command when it is unmarshalled on the other end
+     *
+     * @param completeCommand the newly unmarshalled complete command
+     */
+    public void configure(Command completeCommand) {
+        // copy across the transient properties added by the low level transport
+        completeCommand.setFrom(getFrom());
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LocalTransactionId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LocalTransactionId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LocalTransactionId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LocalTransactionId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller code="111"
+ * @version $Revision: 1.11 $
+ */
+public class LocalTransactionId extends TransactionId implements Comparable<LocalTransactionId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_LOCAL_TRANSACTION_ID;
+
+    protected ConnectionId connectionId;
+    protected long value;
+
+    private transient String transactionKey;
+    private transient int hashCode;
+
+    public LocalTransactionId() {
+    }
+
+    public LocalTransactionId(ConnectionId connectionId, long transactionId) {
+        this.connectionId = connectionId;
+        this.value = transactionId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isXATransaction() {
+        return false;
+    }
+
+    public boolean isLocalTransaction() {
+        return true;
+    }
+
+    public String getTransactionKey() {
+        if (transactionKey == null) {
+            transactionKey = "TX:" + connectionId + ":" + value;
+        }
+        return transactionKey;
+    }
+
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != LocalTransactionId.class) {
+            return false;
+        }
+        LocalTransactionId tx = (LocalTransactionId)o;
+        return value == tx.value && connectionId.equals(tx.connectionId);
+    }
+
+    /**
+     * @param o
+     * @return
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    public int compareTo(LocalTransactionId o) {
+        int result = connectionId.compareTo(o.connectionId);
+        if (result == 0) {
+            result = (int)(value - o.value);
+        }
+        return result;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long transactionId) {
+        this.value = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/LocalTransactionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MarshallAware.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MarshallAware.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MarshallAware.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MarshallAware.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.wireformat.WireFormat;
+
+public interface MarshallAware {
+
+    void beforeMarshall(WireFormat wireFormat) throws IOException;
+
+    void afterMarshall(WireFormat wireFormat) throws IOException;
+
+    void beforeUnmarshall(WireFormat wireFormat) throws IOException;
+
+    void afterUnmarshall(WireFormat wireFormat) throws IOException;
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MarshallAware.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Message.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Message.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Message.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.jms.JMSException;
+
+import org.apache.activemq.IConnection;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.util.ByteArrayInputStream;
+import org.apache.activemq.util.ByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Represents an ActiveMQ message
+ * 
+ * @openwire:marshaller
+ * @version $Revision$
+ */
+public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
+
+    /**
+     * The default minimum amount of memory a message is assumed to use
+     */
+    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
+
+    protected MessageId messageId;
+    protected ActiveMQDestination originalDestination;
+    protected TransactionId originalTransactionId;
+
+    protected ProducerId producerId;
+    protected ActiveMQDestination destination;
+    protected TransactionId transactionId;
+
+    protected long expiration;
+    protected long timestamp;
+    protected long arrival;
+    protected long brokerInTime;
+    protected long brokerOutTime;
+    protected String correlationId;
+    protected ActiveMQDestination replyTo;
+    protected boolean persistent;
+    protected String type;
+    protected byte priority;
+    protected String groupID;
+    protected int groupSequence;
+    protected ConsumerId targetConsumerId;
+    protected boolean compressed;
+    protected String userID;
+
+    protected ByteSequence content;
+    protected ByteSequence marshalledProperties;
+    protected DataStructure dataStructure;
+    protected int redeliveryCounter;
+
+    protected int size;
+    protected Map<String, Object> properties;
+    protected boolean readOnlyProperties;
+    protected boolean readOnlyBody;
+    protected transient boolean recievedByDFBridge;
+    protected boolean droppable;
+
+    private transient short referenceCount;
+    private transient IConnection connection;
+    private transient org.apache.activemq.broker.region.Destination regionDestination;
+    private transient MemoryUsage memoryUsage;
+
+    private BrokerId[] brokerPath;
+    private BrokerId[] cluster;
+
+    public abstract Message copy();
+    public abstract void clearBody() throws JMSException;
+
+    protected void copy(Message copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.transactionId = transactionId;
+        copy.destination = destination;
+        copy.messageId = messageId != null ? messageId.copy() : null;
+        copy.originalDestination = originalDestination;
+        copy.originalTransactionId = originalTransactionId;
+        copy.expiration = expiration;
+        copy.timestamp = timestamp;
+        copy.correlationId = correlationId;
+        copy.replyTo = replyTo;
+        copy.persistent = persistent;
+        copy.redeliveryCounter = redeliveryCounter;
+        copy.type = type;
+        copy.priority = priority;
+        copy.size = size;
+        copy.groupID = groupID;
+        copy.userID = userID;
+        copy.groupSequence = groupSequence;
+
+        if (properties != null) {
+            copy.properties = new HashMap<String, Object>(properties);
+        } else {
+            copy.properties = properties;
+        }
+
+        copy.content = content;
+        copy.marshalledProperties = marshalledProperties;
+        copy.dataStructure = dataStructure;
+        copy.readOnlyProperties = readOnlyProperties;
+        copy.readOnlyBody = readOnlyBody;
+        copy.compressed = compressed;
+        copy.recievedByDFBridge = recievedByDFBridge;
+
+        copy.arrival = arrival;
+        copy.connection = connection;
+        copy.regionDestination = regionDestination;
+        copy.brokerInTime = brokerInTime;
+        copy.brokerOutTime = brokerOutTime;
+        copy.memoryUsage=this.memoryUsage;
+        copy.brokerPath = brokerPath;
+
+        // lets not copy the following fields
+        // copy.targetConsumerId = targetConsumerId;
+        // copy.referenceCount = referenceCount;
+    }
+
+    public Object getProperty(String name) throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return null;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return properties.get(name);
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> getProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                return Collections.EMPTY_MAP;
+            }
+            properties = unmarsallProperties(marshalledProperties);
+        }
+        return Collections.unmodifiableMap(properties);
+    }
+
+    public void clearProperties() {
+        marshalledProperties = null;
+        properties = null;
+    }
+
+    public void setProperty(String name, Object value) throws IOException {
+        lazyCreateProperties();
+        properties.put(name, value);
+    }
+
+    protected void lazyCreateProperties() throws IOException {
+        if (properties == null) {
+            if (marshalledProperties == null) {
+                properties = new HashMap<String, Object>();
+            } else {
+                properties = unmarsallProperties(marshalledProperties);
+                marshalledProperties = null;
+            }
+        }
+    }
+
+    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
+        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
+    }
+
+    public void beforeMarshall(WireFormat wireFormat) throws IOException {
+        // Need to marshal the properties.
+        if (marshalledProperties == null && properties != null) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            MarshallingSupport.marshalPrimitiveMap(properties, os);
+            os.close();
+            marshalledProperties = baos.toByteSequence();
+        }
+    }
+
+    public void afterMarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    public void afterUnmarshall(WireFormat wireFormat) throws IOException {
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    //
+    // Simple Field accessors
+    //
+    // /////////////////////////////////////////////////////////////////
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getOriginalDestination() {
+        return originalDestination;
+    }
+
+    public void setOriginalDestination(ActiveMQDestination destination) {
+        this.originalDestination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getOriginalTransactionId() {
+        return originalTransactionId;
+    }
+
+    public void setOriginalTransactionId(TransactionId transactionId) {
+        this.originalTransactionId = transactionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getGroupID() {
+        return groupID;
+    }
+
+    public void setGroupID(String groupID) {
+        this.groupID = groupID;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getGroupSequence() {
+        return groupSequence;
+    }
+
+    public void setGroupSequence(int groupSequence) {
+        this.groupSequence = groupSequence;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isPersistent() {
+        return persistent;
+    }
+
+    public void setPersistent(boolean deliveryMode) {
+        this.persistent = deliveryMode;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getExpiration() {
+        return expiration;
+    }
+
+    public void setExpiration(long expiration) {
+        this.expiration = expiration;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getPriority() {
+        return priority;
+    }
+
+    public void setPriority(byte priority) {
+        this.priority = priority;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ActiveMQDestination getReplyTo() {
+        return replyTo;
+    }
+
+    public void setReplyTo(ActiveMQDestination replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ByteSequence getContent() {
+        return content;
+    }
+
+    public void setContent(ByteSequence content) {
+        this.content = content;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ByteSequence getMarshalledProperties() {
+        return marshalledProperties;
+    }
+
+    public void setMarshalledProperties(ByteSequence marshalledProperties) {
+        this.marshalledProperties = marshalledProperties;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public DataStructure getDataStructure() {
+        return dataStructure;
+    }
+
+    public void setDataStructure(DataStructure data) {
+        this.dataStructure = data;
+    }
+
+    /**
+     * Can be used to route the message to a specific consumer. Should be null
+     * to allow the broker use normal JMS routing semantics. If the target
+     * consumer id is an active consumer on the broker, the message is dropped.
+     * Used by the AdvisoryBroker to replay advisory messages to a specific
+     * consumer.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getTargetConsumerId() {
+        return targetConsumerId;
+    }
+
+    public void setTargetConsumerId(ConsumerId targetConsumerId) {
+        this.targetConsumerId = targetConsumerId;
+    }
+
+    public boolean isExpired() {
+        long expireTime = getExpiration();
+        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+            return true;
+        }
+        return false;
+    }
+
+    public boolean isAdvisory() {
+        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
+
+    public boolean isRedelivered() {
+        return redeliveryCounter > 0;
+    }
+
+    public void setRedelivered(boolean redelivered) {
+        if (redelivered) {
+            if (!isRedelivered()) {
+                setRedeliveryCounter(1);
+            }
+        } else {
+            if (isRedelivered()) {
+                setRedeliveryCounter(0);
+            }
+        }
+    }
+
+    public void incrementRedeliveryCounter() {
+        redeliveryCounter++;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    public boolean isReadOnlyProperties() {
+        return readOnlyProperties;
+    }
+
+    public void setReadOnlyProperties(boolean readOnlyProperties) {
+        this.readOnlyProperties = readOnlyProperties;
+    }
+
+    public boolean isReadOnlyBody() {
+        return readOnlyBody;
+    }
+
+    public void setReadOnlyBody(boolean readOnlyBody) {
+        this.readOnlyBody = readOnlyBody;
+    }
+
+    public IConnection getConnection() {
+        return this.connection;
+    }
+
+    public void setConnection(IConnection connection) {
+        this.connection = connection;
+    }
+
+    /**
+     * Used to schedule the arrival time of a message to a broker. The broker
+     * will not dispatch a message to a consumer until it's arrival time has
+     * elapsed.
+     * 
+     * @openwire:property version=1
+     */
+    public long getArrival() {
+        return arrival;
+    }
+
+    public void setArrival(long arrival) {
+        this.arrival = arrival;
+    }
+
+    /**
+     * Only set by the broker and defines the userID of the producer connection
+     * who sent this message. This is an optional field, it needs to be enabled
+     * on the broker to have this field populated.
+     * 
+     * @openwire:property version=1
+     */
+    public String getUserID() {
+        return userID;
+    }
+
+    public void setUserID(String jmsxUserID) {
+        this.userID = jmsxUserID;
+    }
+
+    public int getReferenceCount() {
+        return referenceCount;
+    }
+
+    public Message getMessageHardRef() {
+        return this;
+    }
+
+    public Message getMessage() throws IOException {
+        return this;
+    }
+
+    public org.apache.activemq.broker.region.Destination getRegionDestination() {
+        return regionDestination;
+    }
+
+    public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
+        this.regionDestination = destination;
+        if(this.memoryUsage==null) {
+            this.memoryUsage=regionDestination.getMemoryUsage();
+        }
+    }
+    
+    public MemoryUsage getMemoryUsage() {
+        return this.memoryUsage;
+    }
+    
+    public void setMemoryUsage(MemoryUsage usage) {
+        this.memoryUsage=usage;
+    }
+
+    public boolean isMarshallAware() {
+        return true;
+    }
+
+    public int incrementReferenceCount() {
+        int rc;
+        int size;
+        synchronized (this) {
+            rc = ++referenceCount;
+            size = getSize();
+        }
+
+        if (rc == 1 && getMemoryUsage() != null) {
+            getMemoryUsage().increaseUsage(size);
+        }
+
+        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+        return rc;
+    }
+
+    public int decrementReferenceCount() {
+        int rc;
+        int size;
+        synchronized (this) {
+            rc = --referenceCount;
+            size = getSize();
+        }
+
+        if (rc == 0 && getMemoryUsage() != null) {
+            getMemoryUsage().decreaseUsage(size);
+        }
+        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
+
+        return rc;
+    }
+
+    public int getSize() {
+        int minimumMessageSize = getMinimumMessageSize();
+        if (size < minimumMessageSize || size == 0) {
+            size = minimumMessageSize;
+            if (marshalledProperties != null) {
+                size += marshalledProperties.getLength();
+            }
+            if (content != null) {
+                size += content.getLength();
+            }
+        }
+        return size;
+    }
+    
+    protected int getMinimumMessageSize() {
+        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
+        //let destination override
+        Destination dest = regionDestination;
+        if (dest != null) {
+            result=dest.getMinimumMessageSize();
+        }
+        return result;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the recievedByDFBridge.
+     */
+    public boolean isRecievedByDFBridge() {
+        return recievedByDFBridge;
+    }
+
+    /**
+     * @param recievedByDFBridge The recievedByDFBridge to set.
+     */
+    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
+        this.recievedByDFBridge = recievedByDFBridge;
+    }
+
+    public void onMessageRolledBack() {
+        incrementRedeliveryCounter();
+    }
+
+    /**
+     * @openwire:property version=2 cache=true
+     */
+    public boolean isDroppable() {
+        return droppable;
+    }
+
+    public void setDroppable(boolean droppable) {
+        this.droppable = droppable;
+    }
+
+    /**
+     * If a message is stored in multiple nodes on a cluster, all the cluster
+     * members will be listed here. Otherwise, it will be null.
+     * 
+     * @openwire:property version=3 cache=true
+     */
+    public BrokerId[] getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(BrokerId[] cluster) {
+        this.cluster = cluster;
+    }
+
+    public boolean isMessage() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerInTime() {
+        return this.brokerInTime;
+    }
+
+    public void setBrokerInTime(long brokerInTime) {
+        this.brokerInTime = brokerInTime;
+    }
+
+    /**
+     * @openwire:property version=3
+     */
+    public long getBrokerOutTime() {
+        return this.brokerOutTime;
+    }
+
+    public void setBrokerOutTime(long brokerOutTime) {
+        this.brokerOutTime = brokerOutTime;
+    }
+    
+    public boolean isDropped() {
+        return false;
+    }
+    
+    public String toString() {
+        return toString(null);
+    }
+    
+    public String toString(Map<String, Object>overrideFields) {
+        try {
+            getProperties();
+        } catch (IOException e) {
+        }
+        return super.toString(overrideFields);
+    }    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/Message.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageAck.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageAck.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageAck.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="22"
+ * @version $Revision: 1.11 $
+ */
+public class MessageAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
+
+    /**
+     * Used to let the broker know that the message has been delivered to the
+     * client. Message will still be retained until an standard ack is received.
+     * This is used get the broker to send more messages past prefetch limits
+     * when an standard ack has not been sent.
+     */
+    public static final byte DELIVERED_ACK_TYPE = 0;
+
+    /**
+     * The standard ack case where a client wants the message to be discarded.
+     */
+    public static final byte STANDARD_ACK_TYPE = 2;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a
+     * message was not processed and the message was considered a poison
+     * message.
+     */
+    public static final byte POSION_ACK_TYPE = 1;
+
+    /**
+     * In case the client want's to explicitly let the broker know that a
+     * message was not processed and it was re-delivered to the consumer
+     * but it was not yet considered to be a poison message.  The messageCount 
+     * field will hold the number of times the message was re-delivered. 
+     */
+    public static final byte REDELIVERED_ACK_TYPE = 3;
+    
+    /**
+     * The  ack case where a client wants only an individual message to be discarded.
+     */
+    public static final byte INDIVIDUAL_ACK_TYPE = 4;
+    
+    protected byte ackType;
+    protected ConsumerId consumerId;
+    protected MessageId firstMessageId;
+    protected MessageId lastMessageId;
+    protected ActiveMQDestination destination;
+    protected TransactionId transactionId;
+    protected int messageCount;
+
+    protected transient String consumerKey;
+
+    public MessageAck() {
+    }
+
+    public MessageAck(MessageDispatch md, byte ackType, int messageCount) {
+        this.ackType = ackType;
+        this.consumerId = md.getConsumerId();
+        this.destination = md.getDestination();
+        this.lastMessageId = md.getMessage().getMessageId();
+        this.messageCount = messageCount;
+    }
+
+    public void copy(MessageAck copy) {
+        super.copy(copy);
+        copy.firstMessageId = firstMessageId;
+        copy.lastMessageId = lastMessageId;
+        copy.destination = destination;
+        copy.transactionId = transactionId;
+        copy.ackType = ackType;
+        copy.consumerId = consumerId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageAck() {
+        return true;
+    }
+
+    public boolean isPoisonAck() {
+        return ackType == POSION_ACK_TYPE;
+    }
+
+    public boolean isStandardAck() {
+        return ackType == STANDARD_ACK_TYPE;
+    }
+
+    public boolean isDeliveredAck() {
+        return ackType == DELIVERED_ACK_TYPE;
+    }
+    
+    public boolean isRedeliveredAck() {
+        return ackType == REDELIVERED_ACK_TYPE;
+    }
+    
+    public boolean isIndividualAck() {
+        return ackType == INDIVIDUAL_ACK_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public TransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(TransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public boolean isInTransaction() {
+        return transactionId != null;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public byte getAckType() {
+        return ackType;
+    }
+
+    public void setAckType(byte ackType) {
+        this.ackType = ackType;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getFirstMessageId() {
+        return firstMessageId;
+    }
+
+    public void setFirstMessageId(MessageId firstMessageId) {
+        this.firstMessageId = firstMessageId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getLastMessageId() {
+        return lastMessageId;
+    }
+
+    public void setLastMessageId(MessageId lastMessageId) {
+        this.lastMessageId = lastMessageId;
+    }
+
+    /**
+     * The number of messages being acknowledged in the range.
+     * 
+     * @openwire:property version=1
+     */
+    public int getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(int messageCount) {
+        this.messageCount = messageCount;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageAck(this);
+    }
+
+    /**
+     * A helper method to allow a single message ID to be acknowledged
+     */
+    public void setMessageID(MessageId messageID) {
+        setFirstMessageId(messageID);
+        setLastMessageId(messageID);
+        setMessageCount(1);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageAck.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatch.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatch.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatch.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="21"
+ * @version $Revision$
+ */
+public class MessageDispatch extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected Message message;
+    protected int redeliveryCounter;
+
+    protected transient long deliverySequenceId;
+    protected transient Object consumer;
+    protected transient Runnable transmitCallback;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageDispatch() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+    public long getDeliverySequenceId() {
+        return deliverySequenceId;
+    }
+
+    public void setDeliverySequenceId(long deliverySequenceId) {
+        this.deliverySequenceId = deliverySequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getRedeliveryCounter() {
+        return redeliveryCounter;
+    }
+
+    public void setRedeliveryCounter(int deliveryCounter) {
+        this.redeliveryCounter = deliveryCounter;
+    }
+
+    public Object getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Object consumer) {
+        this.consumer = consumer;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatch(this);
+    }
+
+    public Runnable getTransmitCallback() {
+        return transmitCallback;
+    }
+
+    public void setTransmitCallback(Runnable transmitCallback) {
+        this.transmitCallback = transmitCallback;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatch.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="90"
+ * @version $Revision$
+ */
+public class MessageDispatchNotification extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH_NOTIFICATION;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected MessageId messageId;
+    protected long deliverySequenceId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return true;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+
+    public long getDeliverySequenceId() {
+        return deliverySequenceId;
+    }
+
+    public void setDeliverySequenceId(long deliverySequenceId) {
+        this.deliverySequenceId = deliverySequenceId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessageDispatchNotification(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageDispatchNotification.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller code="110"
+ * @version $Revision: 1.12 $
+ */
+public class MessageId implements DataStructure, Comparable<MessageId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ID;
+
+    protected ProducerId producerId;
+    protected long producerSequenceId;
+    protected long brokerSequenceId;
+
+    private transient String key;
+    private transient int hashCode;
+
+    public MessageId() {
+        this.producerId = new ProducerId();
+    }
+
+    public MessageId(ProducerInfo producerInfo, long producerSequenceId) {
+        this.producerId = producerInfo.getProducerId();
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    public MessageId(String messageKey) {
+        setValue(messageKey);
+    }
+
+    public MessageId(String producerId, long producerSequenceId) {
+        this(new ProducerId(producerId), producerSequenceId);
+    }
+
+    public MessageId(ProducerId producerId, long producerSequenceId) {
+        this.producerId = producerId;
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * Sets the value as a String
+     */
+    public void setValue(String messageKey) {
+        key = messageKey;
+        // Parse off the sequenceId
+        int p = messageKey.lastIndexOf(":");
+        if (p >= 0) {
+            producerSequenceId = Long.parseLong(messageKey.substring(p + 1));
+            messageKey = messageKey.substring(0, p);
+        }
+        producerId = new ProducerId(messageKey);
+    }
+
+    /**
+     * Sets the transient text view of the message which will be ignored if the
+     * message is marshaled on a transport; so is only for in-JVM changes to
+     * accommodate foreign JMS message IDs
+     */
+    public void setTextView(String key) {
+        this.key = key;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != getClass()) {
+            return false;
+        }
+
+        MessageId id = (MessageId)o;
+        return producerSequenceId == id.producerSequenceId && producerId.equals(id.producerId);
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = producerId.hashCode() ^ (int)producerSequenceId;
+        }
+        return hashCode;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = producerId.toString() + ":" + producerSequenceId;
+        }
+        return key;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getProducerSequenceId() {
+        return producerSequenceId;
+    }
+
+    public void setProducerSequenceId(long producerSequenceId) {
+        this.producerSequenceId = producerSequenceId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getBrokerSequenceId() {
+        return brokerSequenceId;
+    }
+
+    public void setBrokerSequenceId(long brokerSequenceId) {
+        this.brokerSequenceId = brokerSequenceId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public MessageId copy() {
+        MessageId copy = new MessageId(producerId, producerSequenceId);
+        copy.key = key;
+        copy.brokerSequenceId = brokerSequenceId;
+        return copy;
+    }
+
+    /**
+     * @param o
+     * @return
+     * @see java.lang.Comparable#compareTo(java.lang.Object)
+     */
+    public int compareTo(MessageId other) {
+        int result = -1;
+        if (other != null) {
+            result = this.toString().compareTo(other.toString());
+        }
+        return result;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessageId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessagePull.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessagePull.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessagePull.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/MessagePull.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Used to pull messages on demand.
+ * 
+ * @openwire:marshaller code="20"
+ * 
+ * @version $Revision: 563921 $
+ */
+public class MessagePull extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
+
+    protected ConsumerId consumerId;
+    protected ActiveMQDestination destination;
+    protected long timeout;
+    private MessageId messageId;
+    private String correlationId;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessagePull(this);
+    }
+
+    /**
+     * Configures a message pull from the consumer information
+     */
+    public void configure(ConsumerInfo info) {
+        setConsumerId(info.getConsumerId());
+        setDestination(info.getDestination());
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConsumerId getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(ConsumerId consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * An optional correlation ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        this.correlationId = correlationId;
+    }
+
+
+    /**
+     * An optional message ID which could be used by a broker to decide which messages are pulled
+     * on demand from a queue for a consumer
+     *
+     * @openwire:property version=3
+     */
+    public MessageId getMessageId() {
+        return messageId;
+    }
+
+    public void setMessageId(MessageId messageId) {
+        this.messageId = messageId;
+    }
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.util.Arrays;
+
+import org.apache.activemq.broker.openwire.OpenwireMessageEvaluationContext;
+import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller code="91"
+ * @version $Revision: 1.12 $
+ */
+public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
+    static final Log LOG = LogFactory.getLog(NetworkBridgeFilter.class);
+
+    private BrokerId networkBrokerId;
+    private int networkTTL;
+
+    public NetworkBridgeFilter() {
+    }
+
+    public NetworkBridgeFilter(BrokerId remoteBrokerPath, int networkTTL) {
+        this.networkBrokerId = remoteBrokerPath;
+        this.networkTTL = networkTTL;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public boolean matches(MessageEvaluationContext mec) {
+        Message message = ((OpenwireMessageEvaluationContext)mec).getMessage();
+        return matchesForwardingFilter(message);
+    }
+
+    public Object evaluate(MessageEvaluationContext message) {
+        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+    }
+
+    protected boolean matchesForwardingFilter(Message message) {
+
+        if (contains(message.getBrokerPath(), networkBrokerId)) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message all ready routed once through this broker ("
+                        + networkBrokerId + "), path: "
+                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
+            }
+            return false;
+        }
+
+        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
+
+        if (hops >= networkTTL) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
+            }
+            return false;
+        }
+
+        // Don't propagate advisory messages about network subscriptions
+        if (message.isAdvisory() && message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
+            ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
+            hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
+            if (hops >= networkTTL) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
+        if (brokerPath != null && brokerId != null) {
+            for (int i = 0; i < brokerPath.length; i++) {
+                if (brokerId.equals(brokerPath[i])) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getNetworkTTL() {
+        return networkTTL;
+    }
+
+    public void setNetworkTTL(int networkTTL) {
+        this.networkTTL = networkTTL;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId getNetworkBrokerId() {
+        return networkBrokerId;
+    }
+
+    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
+        this.networkBrokerId = remoteBrokerPath;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/PartialCommand.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/PartialCommand.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/PartialCommand.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Represents a partial command; a large command that has been split up into
+ * pieces.
+ * 
+ * @openwire:marshaller code="60"
+ * @version $Revision: 563921 $
+ */
+public class PartialCommand implements Command {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_COMMAND;
+
+    private int commandId;
+    private byte[] data;
+
+    private transient Endpoint from;
+    private transient Endpoint to;
+
+    public PartialCommand() {
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCommandId() {
+        return commandId;
+    }
+
+    public void setCommandId(int commandId) {
+        this.commandId = commandId;
+    }
+
+    /**
+     * The data for this part of the command
+     * 
+     * @openwire:property version=1 mandatory=true
+     */
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
+    }
+
+    public boolean isResponseRequired() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+    }
+
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public String toString() {
+        int size = 0;
+        if (data != null) {
+            size = data.length;
+        }
+        return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
+    }
+    
+    
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerAck.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerAck.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerAck.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * A ProducerAck command is sent by a broker to a producer to let it know it has
+ * received and processed messages that it has produced. The producer will be
+ * flow controlled if it does not receive ProducerAck commands back from the
+ * broker.
+ * 
+ * @openwire:marshaller code="19" version="3"
+ * @version $Revision: 1.11 $
+ */
+public class ProducerAck extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ACK;
+
+    protected ProducerId producerId;
+    protected int size;
+
+    public ProducerAck() {
+    }
+
+    public ProducerAck(ProducerId producerId, int size) {
+        this.producerId = producerId;
+        this.size = size;
+    }
+
+    public void copy(ProducerAck copy) {
+        super.copy(copy);
+        copy.producerId = producerId;
+        copy.size = size;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processProducerAck(this);
+    }
+
+    /**
+     * The producer id that this ack message is destined for.
+     * 
+     * @openwire:property version=3
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * The number of bytes that are being acked.
+     * 
+     * @openwire:property version=3
+     */
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+}

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerId.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerId.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerId.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerId.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+/**
+ * @openwire:marshaller code="123"
+ * @version $Revision$
+ */
+public class ProducerId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
+
+    protected String connectionId;
+    protected long sessionId;
+    protected long value;
+
+    protected transient int hashCode;
+    protected transient String key;
+    protected transient SessionId parentId;
+
+    public ProducerId() {
+    }
+
+    public ProducerId(SessionId sessionId, long producerId) {
+        this.connectionId = sessionId.getConnectionId();
+        this.sessionId = sessionId.getValue();
+        this.value = producerId;
+    }
+
+    public ProducerId(ProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.sessionId = id.getSessionId();
+        this.value = id.getValue();
+    }
+
+    public ProducerId(String producerKey) {
+        // Parse off the producerId
+        int p = producerKey.lastIndexOf(":");
+        if (p >= 0) {
+            value = Long.parseLong(producerKey.substring(p + 1));
+            producerKey = producerKey.substring(0, p);
+        }
+        setProducerSessionKey(producerKey);
+    }
+
+    public SessionId getParentId() {
+        if (parentId == null) {
+            parentId = new SessionId(this);
+        }
+        return parentId;
+    }
+
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)sessionId ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != ProducerId.class) {
+            return false;
+        }
+        ProducerId id = (ProducerId)o;
+        return sessionId == id.sessionId && value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    /**
+     * @param sessionKey
+     */
+    private void setProducerSessionKey(String sessionKey) {
+        // Parse off the value
+        int p = sessionKey.lastIndexOf(":");
+        if (p >= 0) {
+            sessionId = Long.parseLong(sessionKey.substring(p + 1));
+            sessionKey = sessionKey.substring(0, p);
+        }
+        // The rest is the value
+        connectionId = sessionKey;
+    }
+
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + sessionId + ":" + value;
+        }
+        return key;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(String connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getValue() {
+        return value;
+    }
+
+    public void setValue(long producerId) {
+        this.value = producerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public long getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="6"
+ * @version $Revision: 1.13 $
+ */
+public class ProducerInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_INFO;
+
+    protected ProducerId producerId;
+    protected ActiveMQDestination destination;
+    protected BrokerId[] brokerPath;
+    protected boolean dispatchAsync;
+    protected int windowSize;
+
+    public ProducerInfo() {
+    }
+
+    public ProducerInfo(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    public ProducerInfo(SessionInfo sessionInfo, long producerId) {
+        this.producerId = new ProducerId(sessionInfo.getSessionId(), producerId);
+    }
+
+    public ProducerInfo copy() {
+        ProducerInfo info = new ProducerInfo();
+        copy(info);
+        return info;
+    }
+
+    public void copy(ProducerInfo info) {
+        super.copy(info);
+        info.producerId = producerId;
+        info.destination = destination;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ProducerId getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(ProducerId producerId) {
+        this.producerId = producerId;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ActiveMQDestination getDestination() {
+        return destination;
+    }
+
+    public void setDestination(ActiveMQDestination destination) {
+        this.destination = destination;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getProducerId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddProducer(this);
+    }
+
+    /**
+     * If the broker should dispatch messages from this producer async. Since
+     * sync dispatch could potentally block the producer thread, this could be
+     * an important setting for the producer.
+     * 
+     * @openwire:property version=2
+     */
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
+    }
+
+    public void setDispatchAsync(boolean dispatchAsync) {
+        this.dispatchAsync = dispatchAsync;
+    }
+
+    /**
+     * Used to configure the producer window size. A producer will send up to
+     * the configured window size worth of payload data to the broker before
+     * waiting for an Ack that allows him to send more.
+     * 
+     * @openwire:property version=3
+     */
+    public int getWindowSize() {
+        return windowSize;
+    }
+
+    public void setWindowSize(int windowSize) {
+        this.windowSize = windowSize;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ProducerInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import java.io.IOException;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * Removes a consumer, producer, session or connection.
+ * 
+ * @openwire:marshaller code="12"
+ * @version $Revision$
+ */
+public class RemoveInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_INFO;
+
+    protected DataStructure objectId;
+    protected long lastDeliveredSequenceId;
+
+    public RemoveInfo() {
+    }
+
+    public RemoveInfo(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public DataStructure getObjectId() {
+        return objectId;
+    }
+
+    public void setObjectId(DataStructure objectId) {
+        this.objectId = objectId;
+    }
+
+    /**
+     * @openwire:property version=5 cache=false
+     */
+    public long getLastDeliveredSequenceId() {
+        return lastDeliveredSequenceId;
+    }
+
+    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
+        this.lastDeliveredSequenceId = lastDeliveredSequenceId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        switch (objectId.getDataStructureType()) {
+        case ConnectionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConnection((ConnectionId)objectId, lastDeliveredSequenceId);
+        case SessionId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveSession((SessionId)objectId, lastDeliveredSequenceId);
+        case ConsumerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveConsumer((ConsumerId)objectId, lastDeliveredSequenceId);
+        case ProducerId.DATA_STRUCTURE_TYPE:
+            return visitor.processRemoveProducer((ProducerId)objectId);
+        default:
+            throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
+        }
+    }
+
+    /**
+     * Returns true if this event is for a removed connection
+     */
+    public boolean isConnectionRemove() {
+        return objectId.getDataStructureType() == ConnectionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed session
+     */
+    public boolean isSessionRemove() {
+        return objectId.getDataStructureType() == SessionId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed consumer
+     */
+    public boolean isConsumerRemove() {
+        return objectId.getDataStructureType() == ConsumerId.DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * Returns true if this event is for a removed producer
+     */
+    public boolean isProducerRemove() {
+        return objectId.getDataStructureType() == ProducerId.DATA_STRUCTURE_TYPE;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java?rev=780773&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java Mon Jun  1 18:37:41 2009
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.command;
+
+import org.apache.activemq.state.CommandVisitor;
+
+/**
+ * @openwire:marshaller code="9"
+ * @version $Revision: 1.7 $
+ */
+public class RemoveSubscriptionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
+
+    protected ConnectionId connectionId;
+    protected String clientId;
+    protected String subscriptionName;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @deprecated
+     */
+    public String getSubcriptionName() {
+        return subscriptionName;
+    }
+
+    /**
+     * @deprecated
+     */
+    public void setSubcriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public void setSubscriptionName(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processRemoveSubscription(this);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message