activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1097189 [37/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...
Date Wed, 27 Apr 2011 17:33:09 GMT
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQStreamMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQStreamMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQStreamMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * @openwire:marshaller code="27"
+ */
+public class ActiveMQStreamMessage extends ActiveMQMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
+
+    protected transient DataOutputStream dataOut;
+    protected transient ByteArrayOutputStream bytesOut;
+    protected transient DataInputStream dataIn;
+    protected transient int remainingBytes = -1;
+
+    public Message copy() {
+        ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQStreamMessage copy) {
+        storeContent();
+        super.copy(copy);
+        copy.dataOut = null;
+        copy.bytesOut = null;
+        copy.dataIn = null;
+    }
+
+    public void onSend() throws OpenwireException {
+        super.onSend();
+        storeContent();
+    }
+
+    private void storeContent() {
+        if (dataOut != null) {
+            try {
+                dataOut.close();
+                setContent(bytesOut.toBuffer());
+                bytesOut = null;
+                dataOut = null;
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/stream-message";
+    }
+
+    public void clearBody() throws OpenwireException {
+        super.clearBody();
+        this.dataOut = null;
+        this.dataIn = null;
+        this.bytesOut = null;
+        this.remainingBytes = -1;
+    }
+
+    public boolean readBoolean() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to boolean.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a boolean type");
+            }
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public byte readByte() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(10);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to byte.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a byte type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public short readShort() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Short.valueOf(this.dataIn.readUTF()).shortValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to short.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a short type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+
+    }
+
+    public char readChar() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(17);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return this.dataIn.readChar();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to char.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a char type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readInt() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Integer.valueOf(this.dataIn.readUTF()).intValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to int.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not an int type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public long readLong() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return this.dataIn.readLong();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return this.dataIn.readInt();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return this.dataIn.readShort();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return this.dataIn.readByte();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Long.valueOf(this.dataIn.readUTF()).longValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to long.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a long type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public float readFloat() throws OpenwireException {
+        initializeReading();
+        try {
+            this.dataIn.mark(33);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Float.valueOf(this.dataIn.readUTF()).floatValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to float.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a float type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public double readDouble() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return this.dataIn.readDouble();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return this.dataIn.readFloat();
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
+            }
+            if (type == MarshallingSupport.NULL) {
+                this.dataIn.reset();
+                throw new NullPointerException("Cannot convert NULL value to double.");
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a double type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public String readString() throws OpenwireException {
+        initializeReading();
+        try {
+
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return new Long(this.dataIn.readLong()).toString();
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return new Integer(this.dataIn.readInt()).toString();
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return new Short(this.dataIn.readShort()).toString();
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return new Byte(this.dataIn.readByte()).toString();
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat()).toString();
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble()).toString();
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return new Character(this.dataIn.readChar()).toString();
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException(" not a String type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readBytes(byte[] value) throws OpenwireException {
+
+        initializeReading();
+        try {
+            if (value == null) {
+                throw new NullPointerException();
+            }
+
+            if (remainingBytes == -1) {
+                this.dataIn.mark(value.length + 1);
+                int type = this.dataIn.read();
+                if (type == -1) {
+                    throw new OpenwireException("reached end of data");
+                }
+                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
+                    throw new OpenwireException("Not a byte array");
+                }
+                remainingBytes = this.dataIn.readInt();
+            } else if (remainingBytes == 0) {
+                remainingBytes = -1;
+                return -1;
+            }
+
+            if (value.length <= remainingBytes) {
+                // small buffer
+                remainingBytes -= value.length;
+                this.dataIn.readFully(value);
+                return value.length;
+            } else {
+                // big buffer
+                int rc = this.dataIn.read(value, 0, remainingBytes);
+                remainingBytes = 0;
+                return rc;
+            }
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e.getMessage(),e);
+        } catch (IOException e) {
+            throw new OpenwireException(e.getMessage(),e);
+        }
+    }
+
+    public Object readObject() throws OpenwireException {
+        initializeReading();
+        try {
+            this.dataIn.mark(65);
+            int type = this.dataIn.read();
+            if (type == -1) {
+                throw new OpenwireException("reached end of data");
+            }
+            if (type == MarshallingSupport.NULL) {
+                return null;
+            }
+            if (type == MarshallingSupport.BIG_STRING_TYPE) {
+                return MarshallingSupport.readUTF8(dataIn);
+            }
+            if (type == MarshallingSupport.STRING_TYPE) {
+                return this.dataIn.readUTF();
+            }
+            if (type == MarshallingSupport.LONG_TYPE) {
+                return Long.valueOf(this.dataIn.readLong());
+            }
+            if (type == MarshallingSupport.INTEGER_TYPE) {
+                return Integer.valueOf(this.dataIn.readInt());
+            }
+            if (type == MarshallingSupport.SHORT_TYPE) {
+                return Short.valueOf(this.dataIn.readShort());
+            }
+            if (type == MarshallingSupport.BYTE_TYPE) {
+                return Byte.valueOf(this.dataIn.readByte());
+            }
+            if (type == MarshallingSupport.FLOAT_TYPE) {
+                return new Float(this.dataIn.readFloat());
+            }
+            if (type == MarshallingSupport.DOUBLE_TYPE) {
+                return new Double(this.dataIn.readDouble());
+            }
+            if (type == MarshallingSupport.BOOLEAN_TYPE) {
+                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
+            }
+            if (type == MarshallingSupport.CHAR_TYPE) {
+                return Character.valueOf(this.dataIn.readChar());
+            }
+            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
+                int len = this.dataIn.readInt();
+                byte[] value = new byte[len];
+                this.dataIn.readFully(value);
+                return value;
+            } else {
+                this.dataIn.reset();
+                throw new OpenwireException("unknown type");
+            }
+        } catch (NumberFormatException mfe) {
+            try {
+                this.dataIn.reset();
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+            throw mfe;
+
+        } catch (EOFException e) {
+            throw new OpenwireException(e.getMessage(),e);
+        } catch (IOException e) {
+            throw new OpenwireException(e.getMessage(),e);
+        }
+    }
+
+    public void writeBoolean(boolean value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalBoolean(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeByte(byte value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByte(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeShort(short value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalShort(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeChar(char value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalChar(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeInt(int value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalInt(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeLong(long value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalLong(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeFloat(float value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalFloat(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeDouble(double value) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalDouble(dataOut, value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeString(String value) throws OpenwireException {
+        initializeWriting();
+        try {
+            if (value == null) {
+                MarshallingSupport.marshalNull(dataOut);
+            } else {
+                MarshallingSupport.marshalString(dataOut, value);
+            }
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeBytes(byte[] value) throws OpenwireException {
+        writeBytes(value, 0, value.length);
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws OpenwireException {
+        initializeWriting();
+        try {
+            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeObject(Object value) throws OpenwireException {
+        initializeWriting();
+        if (value == null) {
+            try {
+                MarshallingSupport.marshalNull(dataOut);
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            }
+        } else if (value instanceof String) {
+            writeString(value.toString());
+        } else if (value instanceof Character) {
+            writeChar(((Character)value).charValue());
+        } else if (value instanceof Boolean) {
+            writeBoolean(((Boolean)value).booleanValue());
+        } else if (value instanceof Byte) {
+            writeByte(((Byte)value).byteValue());
+        } else if (value instanceof Short) {
+            writeShort(((Short)value).shortValue());
+        } else if (value instanceof Integer) {
+            writeInt(((Integer)value).intValue());
+        } else if (value instanceof Float) {
+            writeFloat(((Float)value).floatValue());
+        } else if (value instanceof Double) {
+            writeDouble(((Double)value).doubleValue());
+        } else if (value instanceof byte[]) {
+            writeBytes((byte[])value);
+        }else if (value instanceof Long) {
+            writeLong(((Long)value).longValue());
+        }else {
+            throw new OpenwireException("Unsupported Object type: " + value.getClass());
+        }
+    }
+
+    public void reset() throws OpenwireException {
+        storeContent();
+        this.bytesOut = null;
+        this.dataIn = null;
+        this.dataOut = null;
+        this.remainingBytes = -1;
+        setReadOnlyBody(true);
+    }
+
+    private void initializeWriting() throws OpenwireException {
+        checkReadOnlyBody();
+        if (this.dataOut == null) {
+            this.bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            if (Settings.enable_compression()) {
+                compressed = true;
+                os = new DeflaterOutputStream(os);
+            }
+            this.dataOut = new DataOutputStream(os);
+        }
+    }
+
+    protected void checkWriteOnlyBody() throws OpenwireException {
+        if (!readOnlyBody) {
+            throw new OpenwireException("Message body is write-only");
+        }
+    }
+
+    private void initializeReading() throws OpenwireException {
+        checkWriteOnlyBody();
+        if (this.dataIn == null) {
+            Buffer data = getContent();
+            if (data == null) {
+                data = new Buffer(new byte[] {}, 0, 0);
+            }
+            InputStream is = new ByteArrayInputStream(data);
+            if (isCompressed()) {
+                is = new InflaterInputStream(is);
+                is = new BufferedInputStream(is);
+            }
+            this.dataIn = new DataInputStream(is);
+        }
+    }
+
+    public String toString() {
+        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.5 $
+ */
+public abstract class ActiveMQTempDestination extends ActiveMQDestination {
+
+    private static final Log LOG = LogFactory.getLog(ActiveMQTempDestination.class);
+//    protected transient String connectionId;
+//    protected transient int sequenceId;
+
+    public ActiveMQTempDestination() {
+    }
+
+    public ActiveMQTempDestination(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempDestination(String connectionId, long sequenceId) {
+        super(connectionId + ":" + sequenceId);
+    }
+
+    public boolean isTemporary() {
+        return true;
+    }
+
+    public void setPhysicalName(String physicalName) {
+        super.setPhysicalName(physicalName);
+//        if ( destination.name()!=null ) {
+//            // Parse off the sequenceId off the end.
+//            // this can fail if the temp destination is
+//            // generated by another JMS system via the JMS<->JMS Bridge
+//        	String value = physicalName.toString();
+//            int p = value.lastIndexOf(":");
+//            if (p >= 0) {
+//                String seqStr = value.substring(p + 1).trim();
+//                if (seqStr != null && seqStr.length() > 0) {
+//                    try {
+//                        sequenceId = Integer.parseInt(seqStr);
+//                    } catch (NumberFormatException e) {
+//                        LOG.debug("Did not parse sequence Id from " + physicalName);
+//                    }
+//                    // The rest should be the connection id.
+//                    connectionId = value.substring(0, p);
+//                }
+//            }
+//        }
+    }
+
+//    public String getConnectionId() {
+//        return connectionId;
+//    }
+//
+//    public void setConnectionId(String connectionId) {
+//        this.connectionId = connectionId;
+//    }
+//
+//    public int getSequenceId() {
+//        return sequenceId;
+//    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @openwire:marshaller code="102"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempQueue extends ActiveMQTempDestination {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_QUEUE;
+    private static final long serialVersionUID = 6683049467527633867L;
+
+    public ActiveMQTempQueue() {
+    }
+
+    public ActiveMQTempQueue(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
+        super(connectionId.getValue(), sequenceId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isQueue() {
+        return true;
+    }
+
+    public String getQueueName() throws OpenwireException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TEMP_QUEUE_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TEMP_QUEUE_QUALIFED_PREFIX;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @openwire:marshaller code="103"
+ * @version $Revision: 1.6 $
+ */
+public class ActiveMQTempTopic extends ActiveMQTempDestination {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_TOPIC;
+    private static final long serialVersionUID = -4325596784597300253L;
+
+    public ActiveMQTempTopic() {
+    }
+
+    public ActiveMQTempTopic(String name) {
+        super(name);
+    }
+
+    public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
+        super(connectionId.getValue(), sequenceId);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isTopic() {
+        return true;
+    }
+
+    public String getTopicName() throws OpenwireException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TEMP_TOPIC_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TEMP_TOPIC_QUALIFED_PREFIX;
+    }
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTextMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTextMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTextMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * @openwire:marshaller code="28"
+ */
+public class ActiveMQTextMessage extends ActiveMQMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
+
+    protected String text;
+
+    public Message copy() {
+        ActiveMQTextMessage copy = new ActiveMQTextMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQTextMessage copy) {
+        super.copy(copy);
+        copy.text = text;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/text-message";
+    }
+
+    public void setText(String text) throws OpenwireException {
+        checkReadOnlyBody();
+        this.text = text;
+        setContent(null);
+    }
+
+    public String getText() throws OpenwireException {
+        if (text == null && getContent() != null) {
+            InputStream is = null;
+            try {
+                Buffer bodyAsBytes = getContent();
+                if (bodyAsBytes != null) {
+                    is = new ByteArrayInputStream(bodyAsBytes);
+                    if (isCompressed()) {
+                        is = new InflaterInputStream(is);
+                    }
+                    DataInputStream dataIn = new DataInputStream(is);
+                    text = MarshallingSupport.readUTF8(dataIn);
+                    dataIn.close();
+                    setContent(null);
+                }
+            } catch (IOException ioe) {
+                throw new OpenwireException(ioe);
+            } finally {
+                if (is != null) {
+                    try {
+                        is.close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+        return text;
+    }
+
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        super.beforeMarshall(wireFormat);
+
+        Buffer content = getContent();
+        if (content == null && text != null) {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            if (Settings.enable_compression()) {
+                compressed = true;
+                os = new DeflaterOutputStream(os);
+            }
+            DataOutputStream dataOut = new DataOutputStream(os);
+            MarshallingSupport.writeUTF8(dataOut, this.text);
+            dataOut.close();
+            setContent(bytesOut.toBuffer());
+            //see https://issues.apache.org/activemq/browse/AMQ-2103
+            this.text=null;
+        }
+    }
+
+    public void clearBody() throws OpenwireException {
+        super.clearBody();
+        this.text = null;
+    }
+
+    public int getSize() {
+        if (size == 0 && content == null && text != null) {
+            size = getMinimumMessageSize();
+            if (marshalledProperties != null) {
+                size += marshalledProperties.getLength();
+            }
+            size = text.length() * 2;
+        }
+        return super.getSize();
+    }
+    
+    public String toString() {
+        try {
+            String text = getText();
+        	if (text != null && text.length() > 63) {
+        		text = text.substring(0, 45) + "..." + text.substring(text.length() - 12);
+        		HashMap<String, Object> overrideFields = new HashMap<String, Object>();
+        		overrideFields.put("text", text);
+        		return super.toString(overrideFields);
+        	}
+        } catch (OpenwireException e) {
+        }
+        return super.toString();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
+ *                         Destination"
+ * @openwire:marshaller code="101"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQTopic extends ActiveMQDestination {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TOPIC;
+    private static final long serialVersionUID = 7300307405896488588L;
+
+    public ActiveMQTopic() {
+    }
+
+    public ActiveMQTopic(String name) {
+        super(name);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isTopic() {
+        return true;
+    }
+
+    public String getTopicName() throws OpenwireException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return TOPIC_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return TOPIC_QUALIFIED_PREFIX;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.util.IntrospectionSupport;
+
+import java.util.Map;
+
+
+/**
+ * 
+ * @openwire:marshaller
+ * @version $Revision: 1.11 $
+ */
+public abstract class BaseCommand implements Command {
+
+    protected int commandId;
+    protected boolean responseRequired;
+    
+    private transient Endpoint from;
+    private transient Endpoint to;
+    
+    public void copy(BaseCommand copy) {
+        copy.commandId = commandId;
+        copy.responseRequired = responseRequired;
+    }    
+
+    /**
+     * @openwire:property version=1
+     */
+    public int getCommandId() {
+        return commandId;
+    }
+
+    public void setCommandId(int commandId) {
+        this.commandId = commandId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isResponseRequired() {
+        return responseRequired;
+    }
+
+    public void setResponseRequired(boolean responseRequired) {
+        this.responseRequired = responseRequired;
+    }
+
+    public String toString() {
+        return toString(null);
+    }
+    
+    public String toString(Map<String, Object>overrideFields) {
+    	return IntrospectionSupport.toString(this, BaseCommand.class, overrideFields);
+    }
+    
+    public boolean isWireFormatInfo() {
+        return false;
+    }
+
+    public boolean isBrokerInfo() {
+        return false;
+    }
+
+    public boolean isResponse() {
+        return false;
+    }
+
+    public boolean isMessageDispatch() {
+        return false;
+    }
+
+    public boolean isMessage() {
+        return false;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public boolean isMessageAck() {
+        return false;
+    }
+
+    public boolean isMessageDispatchNotification() {
+        return false;
+    }
+
+    public boolean isShutdownInfo() {
+        return false;
+    }
+
+    /**
+     * The endpoint within the transport where this message came from.
+     */
+    public Endpoint getFrom() {
+        return from;
+    }
+
+    public void setFrom(Endpoint from) {
+        this.from = from;
+    }
+
+    /**
+     * The endpoint within the transport where this message is going to - null means all endpoints.
+     */
+    public Endpoint getTo() {
+        return to;
+    }
+
+    public void setTo(Endpoint to) {
+        this.to = to;
+    }
+    
+    
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseEndpoint.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseEndpoint.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseEndpoint.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * A default endpoint.
+ * 
+ */
+public class BaseEndpoint implements Endpoint {
+
+    private String name;
+    private BrokerInfo brokerInfo;
+
+    public BaseEndpoint(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String toString() {
+        String brokerText = "";
+        BrokerId brokerId = getBrokerId();
+        if (brokerId != null) {
+            brokerText = " broker: " + brokerId;
+        }
+        return "Endpoint[name:" + name + brokerText + "]";
+    }
+
+    /**
+     * Returns the broker ID for this endpoint, if the endpoint is a broker or
+     * null
+     */
+    public BrokerId getBrokerId() {
+        if (brokerInfo != null) {
+            return brokerInfo.getBrokerId();
+        }
+        return null;
+    }
+
+    /**
+     * Returns the broker information for this endpoint, if the endpoint is a
+     * broker or null
+     */
+    public BrokerInfo getBrokerInfo() {
+        return brokerInfo;
+    }
+
+    public void setBrokerInfo(BrokerInfo brokerInfo) {
+        this.brokerInfo = brokerInfo;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="124"
+ */
+public class BrokerId implements DataStructure {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_ID;
+    protected String value;
+
+    public BrokerId() {
+    }
+
+    public BrokerId(String brokerId) {
+        this.value = brokerId;
+    }
+
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != BrokerId.class) {
+            return false;
+        }
+        BrokerId id = (BrokerId)o;
+        return value.equals(id.value);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String toString() {
+        return value;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String brokerId) {
+        this.value = brokerId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * When a client connects to a broker, the broker send the client a BrokerInfo
+ * so that the client knows which broker node he's talking to and also any peers
+ * that the node has in his cluster. This is the broker helping the client out
+ * in discovering other nodes in the cluster.
+ * 
+ * @openwire:marshaller code="2"
+ * @version $Revision: 1.7 $
+ */
+public class BrokerInfo extends BaseCommand {
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
+    BrokerId brokerId;
+    String brokerURL;
+    boolean slaveBroker;
+    boolean masterBroker;
+    boolean faultTolerantConfiguration;
+    boolean networkConnection;
+    boolean duplexConnection;
+    BrokerInfo peerBrokerInfos[];
+    String brokerName;
+    long connectionId;
+    String brokerUploadUrl;
+    String networkProperties;
+
+    public boolean isBrokerInfo() {
+        return true;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(BrokerId brokerId) {
+        this.brokerId = brokerId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    /**
+     * @openwire:property version=1 testSize=0
+     */
+    public BrokerInfo[] getPeerBrokerInfos() {
+        return peerBrokerInfos;
+    }
+
+    public void setPeerBrokerInfos(BrokerInfo[] peerBrokerInfos) {
+        this.peerBrokerInfos = peerBrokerInfos;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processBrokerInfo(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isSlaveBroker() {
+        return slaveBroker;
+    }
+
+    public void setSlaveBroker(boolean slaveBroker) {
+        this.slaveBroker = slaveBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isMasterBroker() {
+        return masterBroker;
+    }
+
+    /**
+     * @param masterBroker The masterBroker to set.
+     */
+    public void setMasterBroker(boolean masterBroker) {
+        this.masterBroker = masterBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the faultTolerantConfiguration.
+     */
+    public boolean isFaultTolerantConfiguration() {
+        return faultTolerantConfiguration;
+    }
+
+    /**
+     * @param faultTolerantConfiguration The faultTolerantConfiguration to set.
+     */
+    public void setFaultTolerantConfiguration(boolean faultTolerantConfiguration) {
+        this.faultTolerantConfiguration = faultTolerantConfiguration;
+    }
+
+    /**
+     * @openwire:property version=2
+     * @return the duplexConnection
+     */
+    public boolean isDuplexConnection() {
+        return this.duplexConnection;
+    }
+
+    /**
+     * @param duplexConnection the duplexConnection to set
+     */
+    public void setDuplexConnection(boolean duplexConnection) {
+        this.duplexConnection = duplexConnection;
+    }
+
+    /**
+     * @openwire:property version=2
+     * @return the networkConnection
+     */
+    public boolean isNetworkConnection() {
+        return this.networkConnection;
+    }
+
+    /**
+     * @param networkConnection the networkConnection to set
+     */
+    public void setNetworkConnection(boolean networkConnection) {
+        this.networkConnection = networkConnection;
+    }
+
+    /**
+     * The broker assigns a each connection it accepts a connection id.
+     * 
+     * @openwire:property version=2
+     */
+    public long getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(long connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * The URL to use when uploading BLOBs to the broker or some other external
+     * file/http server
+     * 
+     * @openwire:property version=3
+     */
+    public String getBrokerUploadUrl() {
+        return brokerUploadUrl;
+    }
+
+    public void setBrokerUploadUrl(String brokerUploadUrl) {
+        this.brokerUploadUrl = brokerUploadUrl;
+    }
+
+    /**
+     * @openwire:property version=3 cache=false
+     * @return the networkProperties
+     */
+    public String getNetworkProperties() {
+        return this.networkProperties;
+    }
+
+    /**
+     * @param networkProperties the networkProperties to set
+     */
+    public void setNetworkProperties(String networkProperties) {
+        this.networkProperties = networkProperties;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * The Command Pattern so that we can send and receive commands on the different
+ * transports
+ * 
+ * @version $Revision: 1.7 $
+ */
+public interface Command extends DataStructure {
+
+    void setCommandId(int value);
+
+    /**
+     * @return the unique ID of this request used to map responses to requests
+     */
+    int getCommandId();
+
+    void setResponseRequired(boolean responseRequired);
+
+    boolean isResponseRequired();
+
+    boolean isResponse();
+
+    boolean isMessageDispatch();
+
+    boolean isBrokerInfo();
+
+    boolean isWireFormatInfo();
+
+    boolean isMessage();
+
+    boolean isMessageAck();
+
+    boolean isMessageDispatchNotification();
+
+    boolean isShutdownInfo();
+
+    Response visit(CommandVisitor visitor) throws Exception;
+
+    /**
+     * The endpoint within the transport where this message came from which
+     * could be null if the transport only supports a single endpoint.
+     */
+    Endpoint getFrom();
+
+    void setFrom(Endpoint from);
+
+    /**
+     * The endpoint within the transport where this message is going to - null
+     * means all endpoints.
+     */
+    Endpoint getTo();
+
+    void setTo(Endpoint to);
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/CommandTypes.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/CommandTypes.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/CommandTypes.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * Holds the command id constants used by the command objects.
+ * 
+ * @version $Revision: 1.21 $
+ */
+public interface CommandTypes {
+
+    // What is the latest version of the openwire protocol
+    byte PROTOCOL_VERSION = 6;
+
+    // A marshaling layer can use this type to specify a null object.
+    byte NULL = 0;
+
+    // /////////////////////////////////////////////////
+    //
+    // Info objects sent back and forth client/server when
+    // setting up a client connection.
+    //
+    // /////////////////////////////////////////////////
+    byte WIREFORMAT_INFO = 1;
+    byte BROKER_INFO = 2;
+    byte CONNECTION_INFO = 3;
+    byte SESSION_INFO = 4;
+    byte CONSUMER_INFO = 5;
+    byte PRODUCER_INFO = 6;
+    byte TRANSACTION_INFO = 7;
+    byte DESTINATION_INFO = 8;
+    byte REMOVE_SUBSCRIPTION_INFO = 9;
+    byte KEEP_ALIVE_INFO = 10;
+    byte SHUTDOWN_INFO = 11;
+    byte REMOVE_INFO = 12;
+    byte CONTROL_COMMAND = 14;
+    byte FLUSH_COMMAND = 15;
+    byte CONNECTION_ERROR = 16;
+    byte CONSUMER_CONTROL = 17;
+    byte CONNECTION_CONTROL = 18;
+
+    // /////////////////////////////////////////////////
+    //
+    // Messages that go back and forth between the client
+    // and the server.
+    //
+    // /////////////////////////////////////////////////
+    byte PRODUCER_ACK = 19;
+    byte MESSAGE_PULL = 20;
+    byte MESSAGE_DISPATCH = 21;
+    byte MESSAGE_ACK = 22;
+
+    byte ACTIVEMQ_MESSAGE = 23;
+    byte ACTIVEMQ_BYTES_MESSAGE = 24;
+    byte ACTIVEMQ_MAP_MESSAGE = 25;
+    byte ACTIVEMQ_OBJECT_MESSAGE = 26;
+    byte ACTIVEMQ_STREAM_MESSAGE = 27;
+    byte ACTIVEMQ_TEXT_MESSAGE = 28;
+    byte ACTIVEMQ_BLOB_MESSAGE = 29;
+
+    // /////////////////////////////////////////////////
+    //
+    // Command Response messages
+    //
+    // /////////////////////////////////////////////////
+    byte RESPONSE = 30;
+    byte EXCEPTION_RESPONSE = 31;
+    byte DATA_RESPONSE = 32;
+    byte DATA_ARRAY_RESPONSE = 33;
+    byte INTEGER_RESPONSE = 34;
+
+    // /////////////////////////////////////////////////
+    //
+    // Used by discovery
+    //
+    // /////////////////////////////////////////////////
+    byte DISCOVERY_EVENT = 40;
+
+    // /////////////////////////////////////////////////
+    //
+    // Command object used by the Journal
+    //
+    // /////////////////////////////////////////////////
+    byte JOURNAL_ACK = 50;
+    byte JOURNAL_REMOVE = 52;
+    byte JOURNAL_TRACE = 53;
+    byte JOURNAL_TRANSACTION = 54;
+    byte DURABLE_SUBSCRIPTION_INFO = 55;
+
+    // /////////////////////////////////////////////////
+    //
+    // Reliability and fragmentation
+    //
+    // /////////////////////////////////////////////////
+    byte PARTIAL_COMMAND = 60;
+    byte PARTIAL_LAST_COMMAND = 61;
+
+    byte REPLAY = 65;
+
+    // /////////////////////////////////////////////////
+    //
+    // Types used represent basic Java types.
+    //
+    // /////////////////////////////////////////////////
+    byte BYTE_TYPE = 70;
+    byte CHAR_TYPE = 71;
+    byte SHORT_TYPE = 72;
+    byte INTEGER_TYPE = 73;
+    byte LONG_TYPE = 74;
+    byte DOUBLE_TYPE = 75;
+    byte FLOAT_TYPE = 76;
+    byte STRING_TYPE = 77;
+    byte BOOLEAN_TYPE = 78;
+    byte BYTE_ARRAY_TYPE = 79;
+
+    // /////////////////////////////////////////////////
+    //
+    // Broker to Broker command objects
+    //
+    // /////////////////////////////////////////////////
+
+    byte MESSAGE_DISPATCH_NOTIFICATION = 90;
+    byte NETWORK_BRIDGE_FILTER = 91;
+
+    // /////////////////////////////////////////////////
+    //
+    // Data structures contained in the command objects.
+    //
+    // /////////////////////////////////////////////////
+    byte ACTIVEMQ_QUEUE = 100;
+    byte ACTIVEMQ_TOPIC = 101;
+    byte ACTIVEMQ_TEMP_QUEUE = 102;
+    byte ACTIVEMQ_TEMP_TOPIC = 103;
+
+    byte MESSAGE_ID = 110;
+    byte ACTIVEMQ_LOCAL_TRANSACTION_ID = 111;
+    byte ACTIVEMQ_XA_TRANSACTION_ID = 112;
+
+    byte CONNECTION_ID = 120;
+    byte SESSION_ID = 121;
+    byte CONSUMER_ID = 122;
+    byte PRODUCER_ID = 123;
+    byte BROKER_ID = 124;
+
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/CommandTypes.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * Used to start and stop transports as well as terminating clients.
+ * 
+ * @openwire:marshaller code="18"
+ * @version $Revision: 1.1 $
+ */
+public class ConnectionControl extends BaseCommand {
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_CONTROL;
+    protected boolean suspend;
+    protected boolean resume;
+    protected boolean close;
+    protected boolean exit;
+    protected boolean faultTolerant;
+    protected String connectedBrokers="";
+    protected String reconnectTo = "";
+    protected boolean rebalanceConnection;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processConnectionControl(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the close.
+     */
+    public boolean isClose() {
+        return close;
+    }
+
+    /**
+     * @param close The close to set.
+     */
+    public void setClose(boolean close) {
+        this.close = close;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the exit.
+     */
+    public boolean isExit() {
+        return exit;
+    }
+
+    /**
+     * @param exit The exit to set.
+     */
+    public void setExit(boolean exit) {
+        this.exit = exit;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the faultTolerant.
+     */
+    public boolean isFaultTolerant() {
+        return faultTolerant;
+    }
+
+    /**
+     * @param faultTolerant The faultTolerant to set.
+     */
+    public void setFaultTolerant(boolean faultTolerant) {
+        this.faultTolerant = faultTolerant;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the resume.
+     */
+    public boolean isResume() {
+        return resume;
+    }
+
+    /**
+     * @param resume The resume to set.
+     */
+    public void setResume(boolean resume) {
+        this.resume = resume;
+    }
+
+    /**
+     * @openwire:property version=1
+     * @return Returns the suspend.
+     */
+    public boolean isSuspend() {
+        return suspend;
+    }
+
+    /**
+     * @param suspend The suspend to set.
+     */
+    public void setSuspend(boolean suspend) {
+        this.suspend = suspend;
+    }
+
+    /**
+     * @openwire:property version=6 cache=false
+     * @return connected brokers.
+     */
+    public String getConnectedBrokers() {
+        return this.connectedBrokers;
+    }
+
+    /**
+     * @param connectedBrokers the connectedBrokers to set
+     */
+    public void setConnectedBrokers(String connectedBrokers) {
+        this.connectedBrokers = connectedBrokers;
+    }
+
+    /**
+     *  @openwire:property version=6 cache=false
+     * @return the reconnectTo
+     */
+    public String getReconnectTo() {
+        return this.reconnectTo;
+    }
+
+    /**
+     * @param reconnectTo the reconnectTo to set
+     */
+    public void setReconnectTo(String reconnectTo) {
+        this.reconnectTo = reconnectTo;
+    }
+
+    /**
+     * @return the rebalanceConnection
+     *  @openwire:property version=6 cache=false
+     */
+    public boolean isRebalanceConnection() {
+        return this.rebalanceConnection;
+    }
+
+    /**
+     * @param rebalanceConnection the rebalanceConnection to set
+     */
+    public void setRebalanceConnection(boolean rebalanceConnection) {
+        this.rebalanceConnection = rebalanceConnection;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * 
+ * @openwire:marshaller code="16"
+ */
+public class ConnectionError extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_ERROR;
+
+    private ConnectionId connectionId;
+    private Throwable exception;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
+        return visitor.processConnectionError(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public Throwable getException() {
+        return exception;
+    }
+
+    public void setException(Throwable exception) {
+        this.exception = exception;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+/**
+ * @openwire:marshaller code="120"
+ */
+public class ConnectionId implements DataStructure, Comparable<ConnectionId> {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_ID;
+
+    protected String value;
+
+    public ConnectionId() {
+    }
+
+    public ConnectionId(String connectionId) {
+        this.value = connectionId;
+    }
+
+    public ConnectionId(ConnectionId id) {
+        this.value = id.getValue();
+    }
+
+    public ConnectionId(SessionId id) {
+        this.value = id.getConnectionId();
+    }
+
+    public ConnectionId(ProducerId id) {
+        this.value = id.getConnectionId();
+    }
+
+    public ConnectionId(ConsumerId id) {
+        this.value = id.getConnectionId();
+    }
+
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != ConnectionId.class) {
+            return false;
+        }
+        ConnectionId id = (ConnectionId)o;
+        return value.equals(id.value);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String toString() {
+        return value;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String connectionId) {
+        this.value = connectionId;
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public int compareTo(ConnectionId o) {
+        return value.compareTo(o.value);
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+/**
+ * 
+ * @openwire:marshaller code="3"
+ * @version $Revision: 1.11 $
+ */
+public class ConnectionInfo extends BaseCommand {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_INFO;
+
+    protected ConnectionId connectionId;
+    protected String clientId;
+    protected String userName;
+    protected String password;
+    protected BrokerId[] brokerPath;
+    protected boolean brokerMasterConnector;
+    protected boolean manageable;
+    protected boolean clientMaster = true;
+    protected boolean faultTolerant = false;
+    protected transient Object transportContext;
+    private boolean failoverReconnect;
+
+    public ConnectionInfo() {
+    }
+
+    public ConnectionInfo(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public ConnectionInfo copy() {
+        ConnectionInfo copy = new ConnectionInfo();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ConnectionInfo copy) {
+        super.copy(copy);
+        copy.connectionId = connectionId;
+        copy.clientId = clientId;
+        copy.userName = userName;
+        copy.password = password;
+        copy.brokerPath = brokerPath;
+        copy.brokerMasterConnector = brokerMasterConnector;
+        copy.manageable = manageable;
+        copy.clientMaster = clientMaster;
+        copy.transportContext = transportContext;
+    }
+
+    /**
+     * @openwire:property version=1 cache=true
+     */
+    public ConnectionId getConnectionId() {
+        return connectionId;
+    }
+
+    public void setConnectionId(ConnectionId connectionId) {
+        this.connectionId = connectionId;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public RemoveInfo createRemoveCommand() {
+        RemoveInfo command = new RemoveInfo(getConnectionId());
+        command.setResponseRequired(isResponseRequired());
+        return command;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    /**
+     * The route of brokers the command has moved through.
+     * 
+     * @openwire:property version=1 cache=true
+     */
+    public BrokerId[] getBrokerPath() {
+        return brokerPath;
+    }
+
+    public void setBrokerPath(BrokerId[] brokerPath) {
+        this.brokerPath = brokerPath;
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processAddConnection(this);
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isBrokerMasterConnector() {
+        return brokerMasterConnector;
+    }
+
+    /**
+     * @param slaveBroker The brokerMasterConnector to set.
+     */
+    public void setBrokerMasterConnector(boolean slaveBroker) {
+        this.brokerMasterConnector = slaveBroker;
+    }
+
+    /**
+     * @openwire:property version=1
+     */
+    public boolean isManageable() {
+        return manageable;
+    }
+
+    /**
+     * @param manageable The manageable to set.
+     */
+    public void setManageable(boolean manageable) {
+        this.manageable = manageable;
+    }
+
+    /**
+     * Transports may wish to associate additional data with the connection. For
+     * example, an SSL transport may use this field to attach the client
+     * certificates used when the conection was established.
+     * 
+     * @return the transport context.
+     */
+    public Object getTransportContext() {
+        return transportContext;
+    }
+
+    /**
+     * Transports may wish to associate additional data with the connection. For
+     * example, an SSL transport may use this field to attach the client
+     * certificates used when the conection was established.
+     * 
+     * @param transportContext value used to set the transport context
+     */
+    public void setTransportContext(Object transportContext) {
+        this.transportContext = transportContext;
+    }
+
+    /**
+     * @openwire:property version=2
+     * @return the clientMaster
+     */
+    public boolean isClientMaster() {
+        return this.clientMaster;
+    }
+
+    /**
+     * @param clientMaster the clientMaster to set
+     */
+    public void setClientMaster(boolean clientMaster) {
+        this.clientMaster = clientMaster;
+    }
+
+    /**
+     * @openwire:property version=6 cache=false
+     * @return the faultTolerant
+     */
+    public boolean isFaultTolerant() {
+        return this.faultTolerant;
+    }
+
+    /**
+     * @param faultTolerant the faultTolerant to set
+     */
+    public void setFaultTolerant(boolean faultTolerant) {
+        this.faultTolerant = faultTolerant;
+    }
+
+    /**
+     * @openwire:property version=6 cache=false
+     * @return failoverReconnect true if this is a reconnect
+     */
+    public boolean isFailoverReconnect() {
+        return this.failoverReconnect;
+    }
+
+    public void setFailoverReconnect(boolean failoverReconnect) {
+        this.failoverReconnect = failoverReconnect;
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message