activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1097189 [36/42] - in /activemq/activemq-apollo/trunk: ./ apollo-openwire/ apollo-openwire/src/ apollo-openwire/src/main/ apollo-openwire/src/main/resources/ apollo-openwire/src/main/resources/META-INF/ apollo-openwire/src/main/resources/ME...
Date Wed, 27 Apr 2011 17:33:09 GMT
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.BufferEditor;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * @openwire:marshaller code=24
+ */
+public class ActiveMQBytesMessage extends ActiveMQMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
+
+    protected transient DataOutputStream dataOut;
+    protected transient ByteArrayOutputStream bytesOut;
+    protected transient DataInputStream dataIn;
+    protected transient int length;
+
+    public Message copy() {
+        ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQBytesMessage copy) {
+        storeContent();
+        super.copy(copy);
+        copy.dataOut = null;
+        copy.bytesOut = null;
+        copy.dataIn = null;
+    }
+
+    public void onSend() throws OpenwireException {
+        super.onSend();
+        storeContent();
+    }
+
+    private void storeContent() {
+        try {
+            if (dataOut != null) {
+                dataOut.close();
+                Buffer bs = bytesOut.toBuffer();
+                if (compressed) {
+                    int pos = bs.offset;
+                    bs.offset = 0;
+                    BufferEditor e = BufferEditor.big(bs);
+                    e.writeInt(length);
+                    bs.offset = pos;
+                }
+                setContent(bs);
+                bytesOut = null;
+                dataOut = null;
+            }
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
+                                                                // RuntimeException
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/bytes-message";
+    }
+
+    public void clearBody() throws OpenwireException {
+        super.clearBody();
+        this.dataOut = null;
+        this.dataIn = null;
+        this.bytesOut = null;
+    }
+
+    public long getBodyLength() throws OpenwireException {
+        initializeReading();
+        return length;
+    }
+
+    public boolean readBoolean() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readBoolean();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public byte readByte() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readByte();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readUnsignedByte() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readUnsignedByte();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public short readShort() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readShort();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readUnsignedShort() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readUnsignedShort();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public char readChar() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readChar();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readInt() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readInt();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public long readLong() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readLong();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public float readFloat() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readFloat();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public double readDouble() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readDouble();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public String readUTF() throws OpenwireException {
+        initializeReading();
+        try {
+            return this.dataIn.readUTF();
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public int readBytes(byte[] value) throws OpenwireException {
+        return readBytes(value, value.length);
+    }
+
+    public int readBytes(byte[] value, int length) throws OpenwireException {
+        initializeReading();
+        try {
+            int n = 0;
+            while (n < length) {
+                int count = this.dataIn.read(value, n, length - n);
+                if (count < 0) {
+                    break;
+                }
+                n += count;
+            }
+            if (n == 0 && length > 0) {
+                n = -1;
+            }
+            return n;
+        } catch (EOFException e) {
+            throw new OpenwireException(e);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public void writeBoolean(boolean value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeBoolean(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeByte(byte value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeByte(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeShort(short value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeShort(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeChar(char value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeChar(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeInt(int value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeInt(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeLong(long value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeLong(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeFloat(float value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeFloat(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeDouble(double value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeDouble(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeUTF(String value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.writeUTF(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeBytes(byte[] value) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.write(value);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws OpenwireException {
+        initializeWriting();
+        try {
+            this.dataOut.write(value, offset, length);
+        } catch (IOException ioe) {
+            throw new OpenwireException(ioe);
+        }
+    }
+
+    public void writeObject(Object value) throws OpenwireException {
+        if (value == null) {
+            throw new NullPointerException();
+        }
+        initializeWriting();
+        if (value instanceof Boolean) {
+            writeBoolean(((Boolean)value).booleanValue());
+        } else if (value instanceof Character) {
+            writeChar(((Character)value).charValue());
+        } else if (value instanceof Byte) {
+            writeByte(((Byte)value).byteValue());
+        } else if (value instanceof Short) {
+            writeShort(((Short)value).shortValue());
+        } else if (value instanceof Integer) {
+            writeInt(((Integer)value).intValue());
+        } else if (value instanceof Long) {
+            writeLong(((Long)value).longValue());
+        } else if (value instanceof Float) {
+            writeFloat(((Float)value).floatValue());
+        } else if (value instanceof Double) {
+            writeDouble(((Double)value).doubleValue());
+        } else if (value instanceof String) {
+            writeUTF(value.toString());
+        } else if (value instanceof byte[]) {
+            writeBytes((byte[])value);
+        } else {
+            throw new OpenwireException("Cannot write non-primitive type:" + value.getClass());
+        }
+    }
+
+    public void reset() throws OpenwireException {
+        storeContent();
+        this.bytesOut = null;
+        this.dataIn = null;
+        this.dataOut = null;
+        setReadOnlyBody(true);
+    }
+
+    private void initializeWriting() throws OpenwireException {
+        checkReadOnlyBody();
+        if (this.dataOut == null) {
+            this.bytesOut = new ByteArrayOutputStream();
+            OutputStream os = bytesOut;
+            if (Settings.enable_compression()) {
+                // keep track of the real length of the content if
+                // we are compressed.
+                try {
+                    os.write(new byte[4]);
+                } catch (IOException e) {
+                    throw new OpenwireException(e);
+                }
+                length = 0;
+                compressed = true;
+                Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+                os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
+                    public void write(byte[] arg0) throws IOException {
+                        length += arg0.length;
+                        out.write(arg0);
+                    }
+
+                    public void write(byte[] arg0, int arg1, int arg2) throws IOException {
+                        length += arg2;
+                        out.write(arg0, arg1, arg2);
+                    }
+
+                    public void write(int arg0) throws IOException {
+                        length++;
+                        out.write(arg0);
+                    }
+                };
+            }
+            this.dataOut = new DataOutputStream(os);
+        }
+    }
+
+    protected void checkWriteOnlyBody() throws OpenwireException {
+        if (!readOnlyBody) {
+            throw new OpenwireException("Message body is write-only");
+        }
+    }
+
+    private void initializeReading() throws OpenwireException {
+        checkWriteOnlyBody();
+        if (dataIn == null) {
+            Buffer data = getContent();
+            if (data == null) {
+                data = new Buffer(new byte[] {}, 0, 0);
+            }
+            InputStream is = new ByteArrayInputStream(data);
+            if (isCompressed()) {
+                // keep track of the real length of the content if
+                // we are compressed.
+                try {
+                    DataInputStream dis = new DataInputStream(is);
+                    length = dis.readInt();
+                    dis.close();
+                } catch (IOException e) {
+                    throw new OpenwireException(e);
+                }
+                is = new InflaterInputStream(is);
+            } else {
+                length = data.getLength();
+            }
+            dataIn = new DataInputStream(is);
+        }
+    }
+
+    public void setObjectProperty(String name, Object value) throws OpenwireException {
+        initializeWriting();
+        super.setObjectProperty(name, value);
+    }
+
+    public String toString() {
+        return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.apollo.broker.DestinationParser;
+import org.apache.activemq.apollo.broker.LocalRouter;
+import org.apache.activemq.apollo.dto.DestinationDTO;
+import org.apache.activemq.apollo.util.path.Path;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.10 $
+ */
+abstract public class ActiveMQDestination implements DataStructure, Comparable {
+
+    public static final String PATH_SEPERATOR = ".";
+    public static final char COMPOSITE_SEPERATOR = ',';
+
+
+    public static final DestinationParser PARSER = new DestinationParser();
+    static {
+        PARSER.path_seperator = new AsciiBuffer(".");
+        PARSER.any_child_wildcard = new AsciiBuffer("*");
+        PARSER.any_descendant_wildcard = new AsciiBuffer(">");
+    }
+
+    public static final byte QUEUE_TYPE = 0x01;
+    public static final byte TOPIC_TYPE = 0x02;
+    public static final byte TEMP_MASK = 0x04;
+    public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+    public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+
+    public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
+    public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
+    public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
+    public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
+
+    public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
+
+    private static final long serialVersionUID = -3885260014960795889L;
+
+    protected String physicalName;
+
+    protected transient DestinationDTO[] destination;
+
+    public ActiveMQDestination() {
+    }
+
+    protected ActiveMQDestination(String name) {
+        setPhysicalName(name);
+    }
+
+
+    public DestinationDTO[] toDestination() {
+        return destination;
+    }
+
+    // static helper methods for working with destinations
+    // -------------------------------------------------------------------------
+    public static ActiveMQDestination createDestination(String name, byte defaultType) {
+
+        if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
+            return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
+        } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
+            return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
+        } else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
+            return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
+        } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
+            return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
+        }
+
+        switch (defaultType) {
+        case QUEUE_TYPE:
+            return new ActiveMQQueue(name);
+        case TOPIC_TYPE:
+            return new ActiveMQTopic(name);
+        case TEMP_QUEUE_TYPE:
+            return new ActiveMQTempQueue(name);
+        case TEMP_TOPIC_TYPE:
+            return new ActiveMQTempTopic(name);
+        default:
+            throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+        }
+    }
+
+    public static int compare(ActiveMQDestination destination, ActiveMQDestination destination2) {
+        if (destination == destination2) {
+            return 0;
+        }
+        if (destination == null) {
+            return -1;
+        } else if (destination2 == null) {
+            return 1;
+        } else {
+            if (destination.isQueue() == destination2.isQueue()) {
+                return destination.getPhysicalName().compareTo(destination2.getPhysicalName());
+            } else {
+                return destination.isQueue() ? -1 : 1;
+            }
+        }
+    }
+
+    public int compareTo(Object that) {
+        if (that instanceof ActiveMQDestination) {
+            return compare(this, (ActiveMQDestination)that);
+        }
+        if (that == null) {
+            return 1;
+        } else {
+            return getClass().getName().compareTo(that.getClass().getName());
+        }
+    }
+
+
+    protected abstract String getQualifiedPrefix();
+
+    /**
+     * @openwire:property version=1
+     */
+    public String getPhysicalName() {
+        return physicalName;
+    }
+
+    DestinationDTO[] create_destination(AsciiBuffer domain, Path path) {
+        return DestinationParser.create_destination(domain, DestinationParser.encode_path(path));
+    }
+
+    public void setPhysicalName(String value) {
+        physicalName = value;
+        String[] composites = value.split(",");
+        if(composites.length == 1) {
+            Path path = PARSER.parsePath(new AsciiBuffer(composites[0]));
+            switch(getDestinationType()) {
+                case QUEUE_TYPE:
+                    destination = create_destination(LocalRouter.QUEUE_DOMAIN(), path);
+                    break;
+                case TOPIC_TYPE:
+                    destination = create_destination(LocalRouter.TOPIC_DOMAIN(), path);
+                    break;
+                case TEMP_QUEUE_TYPE:
+                    destination = create_destination(LocalRouter.TEMP_QUEUE_DOMAIN(), path);
+                    break;
+                case TEMP_TOPIC_TYPE:
+                    destination = create_destination(LocalRouter.TEMP_TOPIC_DOMAIN(), path);
+                    break;
+            }
+        } else {
+            ArrayList<DestinationDTO> l = new ArrayList<DestinationDTO>();
+            for( String c:composites ) {
+                l.add(createDestination(c).destination[0]);
+            }
+            destination = l.toArray(new DestinationDTO[l.size()]);
+        }
+
+    }
+
+    public ActiveMQDestination createDestination(String name) {
+        return createDestination(name, getDestinationType());
+    }
+
+    public abstract byte getDestinationType();
+
+    public boolean isQueue() {
+        return false;
+    }
+
+    public boolean isTopic() {
+        return false;
+    }
+
+    public boolean isTemporary() {
+        return false;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ActiveMQDestination d = (ActiveMQDestination)o;
+        return destination.equals(d.destination);
+    }
+
+    public int hashCode() {
+        return destination.hashCode();
+    }
+
+    public String toString() {
+        return destination.toString();
+    }
+
+    public String getDestinationTypeAsString() {
+        switch (getDestinationType()) {
+        case QUEUE_TYPE:
+            return "Queue";
+        case TOPIC_TYPE:
+            return "Topic";
+        case TEMP_QUEUE_TYPE:
+            return "TempQueue";
+        case TEMP_TOPIC_TYPE:
+            return "TempTopic";
+        default:
+            throw new IllegalArgumentException("Invalid destination type: " + getDestinationType());
+        }
+    }
+
+    public boolean isMarshallAware() {
+        return false;
+    }
+
+    public boolean isComposite() {
+        throw new UnsupportedOperationException();
+    }
+
+    public ActiveMQDestination[] getCompositeDestinations() {
+        throw new UnsupportedOperationException();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+
+ */
+public class ActiveMQMapMessage extends ActiveMQMessage {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MAP_MESSAGE;
+
+    protected transient Map<String, Object> map = new HashMap<String, Object>();
+
+    public Message copy() {
+        ActiveMQMapMessage copy = new ActiveMQMapMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQMapMessage copy) {
+        storeContent();
+        super.copy(copy);
+    }
+
+    // We only need to marshal the content if we are hitting the wire.
+    public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+        super.beforeMarshall(wireFormat);
+        storeContent();
+    }
+
+    private void storeContent() {
+        try {
+            if (getContent() == null && !map.isEmpty()) {
+                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                OutputStream os = bytesOut;
+                if (Settings.enable_compression()) {
+                    compressed = true;
+                    os = new DeflaterOutputStream(os);
+                }
+                DataOutputStream dataOut = new DataOutputStream(os);
+                MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+                dataOut.close();
+                setContent(bytesOut.toBuffer());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Builds the message body from data
+     * 
+     * @throws OpenwireException
+     */
+    private void loadContent() throws OpenwireException {
+        try {
+            if (getContent() != null && map.isEmpty()) {
+                Buffer content = getContent();
+                InputStream is = new ByteArrayInputStream(content);
+                if (isCompressed()) {
+                    is = new InflaterInputStream(is);
+                }
+                DataInputStream dataIn = new DataInputStream(is);
+                map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
+                dataIn.close();
+            }
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/map-message";
+    }
+
+    public void clearBody() throws OpenwireException {
+        super.clearBody();
+        map.clear();
+    }
+
+    public boolean getBoolean(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return false;
+        }
+        if (value instanceof Boolean) {
+            return ((Boolean)value).booleanValue();
+        }
+        if (value instanceof String) {
+            return Boolean.valueOf(value.toString()).booleanValue();
+        } else {
+            throw new OpenwireException(" cannot read a boolean from " + value.getClass().getName());
+        }
+    }
+
+    public byte getByte(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Byte) {
+            return ((Byte)value).byteValue();
+        }
+        if (value instanceof String) {
+            return Byte.valueOf(value.toString()).byteValue();
+        } else {
+            throw new OpenwireException(" cannot read a byte from " + value.getClass().getName());
+        }
+    }
+
+    public short getShort(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Short) {
+            return ((Short)value).shortValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte)value).shortValue();
+        }
+        if (value instanceof String) {
+            return Short.valueOf(value.toString()).shortValue();
+        } else {
+            throw new OpenwireException(" cannot read a short from " + value.getClass().getName());
+        }
+    }
+
+    public char getChar(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            throw new NullPointerException();
+        }
+        if (value instanceof Character) {
+            return ((Character)value).charValue();
+        } else {
+            throw new OpenwireException(" cannot read a short from " + value.getClass().getName());
+        }
+    }
+
+    public int getInt(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Integer) {
+            return ((Integer)value).intValue();
+        }
+        if (value instanceof Short) {
+            return ((Short)value).intValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte)value).intValue();
+        }
+        if (value instanceof String) {
+            return Integer.valueOf(value.toString()).intValue();
+        } else {
+            throw new OpenwireException(" cannot read an int from " + value.getClass().getName());
+        }
+    }
+
+    public long getLong(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Long) {
+            return ((Long)value).longValue();
+        }
+        if (value instanceof Integer) {
+            return ((Integer)value).longValue();
+        }
+        if (value instanceof Short) {
+            return ((Short)value).longValue();
+        }
+        if (value instanceof Byte) {
+            return ((Byte)value).longValue();
+        }
+        if (value instanceof String) {
+            return Long.valueOf(value.toString()).longValue();
+        } else {
+            throw new OpenwireException(" cannot read a long from " + value.getClass().getName());
+        }
+    }
+
+    public float getFloat(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Float) {
+            return ((Float)value).floatValue();
+        }
+        if (value instanceof String) {
+            return Float.valueOf(value.toString()).floatValue();
+        } else {
+            throw new OpenwireException(" cannot read a float from " + value.getClass().getName());
+        }
+    }
+
+    public double getDouble(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return 0;
+        }
+        if (value instanceof Double) {
+            return ((Double)value).doubleValue();
+        }
+        if (value instanceof Float) {
+            return ((Float)value).floatValue();
+        }
+        if (value instanceof String) {
+            return Float.valueOf(value.toString()).floatValue();
+        } else {
+            throw new OpenwireException(" cannot read a double from " + value.getClass().getName());
+        }
+    }
+
+    public String getString(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof byte[]) {
+            throw new OpenwireException("Use getBytes to read a byte array");
+        } else {
+            return value.toString();
+        }
+    }
+
+    public byte[] getBytes(String name) throws OpenwireException {
+        initializeReading();
+        Object value = map.get(name);
+        if (value instanceof byte[]) {
+            return (byte[])value;
+        } else {
+            throw new OpenwireException(" cannot read a byte[] from " + value.getClass().getName());
+        }
+    }
+
+    public Object getObject(String name) throws OpenwireException {
+        initializeReading();
+        return map.get(name);
+    }
+
+    public Enumeration<String> getMapNames() throws OpenwireException {
+        initializeReading();
+        return Collections.enumeration(map.keySet());
+    }
+
+    protected void put(String name, Object value) throws OpenwireException {
+        if (name == null) {
+            throw new IllegalArgumentException("The name of the property cannot be null.");
+        }
+        if (name.length() == 0) {
+            throw new IllegalArgumentException("The name of the property cannot be an emprty string.");
+        }
+        map.put(name, value);
+    }
+
+    public void setBoolean(String name, boolean value) throws OpenwireException {
+        initializeWriting();
+        put(name, value ? Boolean.TRUE : Boolean.FALSE);
+    }
+
+    public void setByte(String name, byte value) throws OpenwireException {
+        initializeWriting();
+        put(name, Byte.valueOf(value));
+    }
+
+    public void setShort(String name, short value) throws OpenwireException {
+        initializeWriting();
+        put(name, Short.valueOf(value));
+    }
+
+    public void setChar(String name, char value) throws OpenwireException {
+        initializeWriting();
+        put(name, Character.valueOf(value));
+    }
+
+    public void setInt(String name, int value) throws OpenwireException {
+        initializeWriting();
+        put(name, Integer.valueOf(value));
+    }
+
+    public void setLong(String name, long value) throws OpenwireException {
+        initializeWriting();
+        put(name, Long.valueOf(value));
+    }
+
+    public void setFloat(String name, float value) throws OpenwireException {
+        initializeWriting();
+        put(name, new Float(value));
+    }
+
+    public void setDouble(String name, double value) throws OpenwireException {
+        initializeWriting();
+        put(name, new Double(value));
+    }
+
+    public void setString(String name, String value) throws OpenwireException {
+        initializeWriting();
+        put(name, value);
+    }
+
+    public void setBytes(String name, byte[] value) throws OpenwireException {
+        initializeWriting();
+        if (value != null) {
+            put(name, value);
+        } else {
+            map.remove(name);
+        }
+    }
+
+    public void setBytes(String name, byte[] value, int offset, int length) throws OpenwireException {
+        initializeWriting();
+        byte[] data = new byte[length];
+        System.arraycopy(value, offset, data, 0, length);
+        put(name, data);
+    }
+
+    public void setObject(String name, Object value) throws OpenwireException {
+        initializeWriting();
+        if (value != null) {
+            // byte[] not allowed on properties
+            if (!(value instanceof byte[])) {
+                checkValidObject(value);
+            }
+            put(name, value);
+        } else {
+            put(name, null);
+        }
+    }
+
+    public boolean itemExists(String name) throws OpenwireException {
+        initializeReading();
+        return map.containsKey(name);
+    }
+
+    private void initializeReading() throws OpenwireException {
+        loadContent();
+    }
+
+    private void initializeWriting() throws OpenwireException {
+        checkReadOnlyBody();
+        setContent(null);
+    }
+
+    public String toString() {
+        return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
+    }
+
+    public Map<String, Object> getContentMap() throws OpenwireException {
+        initializeReading();
+        return map;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.util.Callback;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.apache.activemq.apollo.openwire.support.TypeConversionSupport;
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+/**
+ * @version $Revision:$
+ * @openwire:marshaller code="23"
+ */
+public class ActiveMQMessage extends Message {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
+    private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
+
+    protected transient Callback acknowledgeCallback;
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+
+    public Message copy() {
+        ActiveMQMessage copy = new ActiveMQMessage();
+        copy(copy);
+        return copy;
+    }
+
+    protected void copy(ActiveMQMessage copy) {
+        super.copy(copy);
+        copy.acknowledgeCallback = acknowledgeCallback;
+    }
+
+    public int hashCode() {
+        MessageId id = getMessageId();
+        if (id != null) {
+            return id.hashCode();
+        } else {
+            return super.hashCode();
+        }
+    }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != getClass()) {
+            return false;
+        }
+
+        ActiveMQMessage msg = (ActiveMQMessage) o;
+        MessageId oMsg = msg.getMessageId();
+        MessageId thisMsg = this.getMessageId();
+        return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
+    }
+
+    public void acknowledge() throws OpenwireException {
+        if (acknowledgeCallback != null) {
+            try {
+                acknowledgeCallback.execute();
+            } catch (OpenwireException e) {
+                throw e;
+            } catch (Throwable e) {
+                throw new OpenwireException(e);
+            }
+        }
+    }
+
+    public void clearBody() throws OpenwireException {
+        setContent(null);
+        readOnlyBody = false;
+    }
+
+    public String getJMSMessageID() {
+        MessageId messageId = this.getMessageId();
+        if (messageId == null) {
+            return null;
+        }
+        return messageId.toString();
+    }
+
+    /**
+     * Seems to be invalid because the parameter doesn't initialize MessageId
+     * instance variables ProducerId and ProducerSequenceId
+     *
+     * @param value
+     * @throws OpenwireException
+     */
+    public void setJMSMessageID(String value) throws OpenwireException {
+        if (value != null) {
+            try {
+                MessageId id = new MessageId(value);
+                this.setMessageId(id);
+            } catch (NumberFormatException e) {
+                // we must be some foreign JMS provider or strange user-supplied
+                // String
+                // so lets set the IDs to be 1
+                MessageId id = new MessageId();
+                id.setTextView(value);
+                this.setMessageId(messageId);
+            }
+        } else {
+            this.setMessageId(null);
+        }
+    }
+
+    /**
+     * This will create an object of MessageId. For it to be valid, the instance
+     * variable ProducerId and producerSequenceId must be initialized.
+     *
+     * @param producerId
+     * @param producerSequenceId
+     * @throws OpenwireException
+     */
+    public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws OpenwireException {
+        MessageId id = null;
+        try {
+            id = new MessageId(producerId, producerSequenceId);
+            this.setMessageId(id);
+        } catch (Throwable e) {
+            throw new OpenwireException("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
+        }
+    }
+
+    public long getJMSTimestamp() {
+        return this.getTimestamp();
+    }
+
+    public void setJMSTimestamp(long timestamp) {
+        this.setTimestamp(timestamp);
+    }
+
+    public String getJMSCorrelationID() {
+        return this.getCorrelationId();
+    }
+
+    public void setJMSCorrelationID(String correlationId) {
+        this.setCorrelationId(correlationId);
+    }
+
+    public byte[] getJMSCorrelationIDAsBytes() throws OpenwireException {
+        return encodeString(this.getCorrelationId());
+    }
+
+    public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws OpenwireException {
+        this.setCorrelationId(decodeString(correlationId));
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/message";
+    }
+
+    protected static String decodeString(byte[] data) throws OpenwireException {
+        try {
+            if (data == null) {
+                return null;
+            }
+            return new String(data, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
+        }
+    }
+
+    protected static byte[] encodeString(String data) throws OpenwireException {
+        try {
+            if (data == null) {
+                return null;
+            }
+            return data.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
+        }
+    }
+
+    public ActiveMQDestination getJMSReplyTo() {
+        return this.getReplyTo();
+    }
+
+    public void setJMSReplyTo(ActiveMQDestination destination) throws OpenwireException {
+        this.setReplyTo(destination);
+    }
+
+    public ActiveMQDestination getJMSDestination() {
+        return this.getDestination();
+    }
+
+    public void setJMSDestination(ActiveMQDestination destination) throws OpenwireException {
+        this.setDestination(destination);
+    }
+
+    public int getJMSDeliveryMode() {
+        return this.isPersistent() ? 2 : 1;
+    }
+
+    public void setJMSDeliveryMode(int mode) {
+        this.setPersistent(mode == 2);
+    }
+
+    public boolean getJMSRedelivered() {
+        return this.isRedelivered();
+    }
+
+    public void setJMSRedelivered(boolean redelivered) {
+        this.setRedelivered(redelivered);
+    }
+
+    public String getJMSType() {
+        return this.getType();
+    }
+
+    public void setJMSType(String type) {
+        this.setType(type);
+    }
+
+    public long getJMSExpiration() {
+        return this.getExpiration();
+    }
+
+    public void setJMSExpiration(long expiration) {
+        this.setExpiration(expiration);
+    }
+
+    public int getJMSPriority() {
+        return this.getPriority();
+    }
+
+    public void setJMSPriority(int priority) {
+        this.setPriority((byte) priority);
+    }
+
+    public void clearProperties() {
+        super.clearProperties();
+        readOnlyProperties = false;
+    }
+
+    public boolean propertyExists(String name) throws OpenwireException {
+        try {
+            return this.getProperties().containsKey(name);
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    public Enumeration getPropertyNames() throws OpenwireException {
+        try {
+            return new Vector<String>(this.getProperties().keySet()).elements();
+        } catch (IOException e) {
+            throw new OpenwireException(e);
+        }
+    }
+
+    interface PropertySetter {
+
+        void set(Message message, Object value) throws OpenwireException;
+    }
+
+    static {
+        JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.setRedeliveryCounter(rc.intValue() - 1);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                String rc = (String) TypeConversionSupport.convert(value, String.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.setGroupID(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.setGroupSequence(rc.intValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                String rc = (String) TypeConversionSupport.convert(value, String.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSCorrelationID(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+                if (rc == null) {
+                    Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+                    if (bool == null) {
+                        throw new OpenwireException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
+                    }
+                    else {
+                        rc = bool.booleanValue() ? 2 : 1;
+                    }
+                }
+                ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setReplyTo(rc);
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
+            }
+        });
+        JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
+            public void set(Message message, Object value) throws OpenwireException {
+                String rc = (String) TypeConversionSupport.convert(value, String.class);
+                if (rc == null) {
+                    throw new OpenwireException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
+                }
+                ((ActiveMQMessage) message).setJMSType(rc);
+            }
+        });
+    }
+
+    public void setObjectProperty(String name, Object value) throws OpenwireException {
+        setObjectProperty(name, value, true);
+    }
+
+    public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws OpenwireException {
+
+        if (checkReadOnly) {
+            checkReadOnlyProperties();
+        }
+        if (name == null || name.equals("")) {
+            throw new IllegalArgumentException("Property name cannot be empty or null");
+        }
+
+        checkValidObject(value);
+        PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
+
+        if (setter != null && value != null) {
+            setter.set(this, value);
+        } else {
+            try {
+                this.setProperty(name, value);
+            } catch (IOException e) {
+                throw new OpenwireException(e);
+            }
+        }
+    }
+
+    public void setProperties(Map properties) throws OpenwireException {
+        for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+
+            // Lets use the object property method as we may contain standard
+            // extension headers like JMSXGroupID
+            setObjectProperty((String) entry.getKey(), entry.getValue());
+        }
+    }
+
+    protected void checkValidObject(Object value) throws OpenwireException {
+
+        boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
+        valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
+
+        if (!valid) {
+
+            if (Settings.enable_nested_map_and_list()) {
+                if (!(value instanceof Map || value instanceof List)) {
+                    throw new OpenwireException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
+                }
+            } else {
+                throw new OpenwireException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+            }
+        }
+    }
+
+    public Object getObjectProperty(String name) throws OpenwireException {
+        if (name == null) {
+            throw new NullPointerException("Property name cannot be null");
+        }
+//        try {
+        return createFilterable().getProperty(name);
+//        } catch (FilterException e) {
+//            throw new JMSException(e);
+//        }
+    }
+
+    public boolean getBooleanProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            return false;
+        }
+        Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
+        }
+        return rc.booleanValue();
+    }
+
+    public byte getByteProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NumberFormatException("property " + name + " was null");
+        }
+        Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
+        }
+        return rc.byteValue();
+    }
+
+    public short getShortProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NumberFormatException("property " + name + " was null");
+        }
+        Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
+        }
+        return rc.shortValue();
+    }
+
+    public int getIntProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NumberFormatException("property " + name + " was null");
+        }
+        Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
+        }
+        return rc.intValue();
+    }
+
+    public long getLongProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NumberFormatException("property " + name + " was null");
+        }
+        Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
+        }
+        return rc.longValue();
+    }
+
+    public float getFloatProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
+        }
+        return rc.floatValue();
+    }
+
+    public double getDoubleProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            throw new NullPointerException("property " + name + " was null");
+        }
+        Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
+        }
+        return rc.doubleValue();
+    }
+
+    public String getStringProperty(String name) throws OpenwireException {
+        Object value = getObjectProperty(name);
+        if (value == null) {
+            if (name.equals("JMSXUserID")) {
+                value = getUserID();
+            }
+        }
+        if (value == null) {
+            return null;
+        }
+        String rc = (String) TypeConversionSupport.convert(value, String.class);
+        if (rc == null) {
+            throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
+        }
+        return rc;
+    }
+
+    public void setBooleanProperty(String name, boolean value) throws OpenwireException {
+        setBooleanProperty(name, value, true);
+    }
+
+    public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws OpenwireException {
+        setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
+    }
+
+    public void setByteProperty(String name, byte value) throws OpenwireException {
+        setObjectProperty(name, Byte.valueOf(value));
+    }
+
+    public void setShortProperty(String name, short value) throws OpenwireException {
+        setObjectProperty(name, Short.valueOf(value));
+    }
+
+    public void setIntProperty(String name, int value) throws OpenwireException {
+        setObjectProperty(name, Integer.valueOf(value));
+    }
+
+    public void setLongProperty(String name, long value) throws OpenwireException {
+        setObjectProperty(name, Long.valueOf(value));
+    }
+
+    public void setFloatProperty(String name, float value) throws OpenwireException {
+        setObjectProperty(name, new Float(value));
+    }
+
+    public void setDoubleProperty(String name, double value) throws OpenwireException {
+        setObjectProperty(name, new Double(value));
+    }
+
+    public void setStringProperty(String name, String value) throws OpenwireException {
+        setObjectProperty(name, value);
+    }
+
+    private void checkReadOnlyProperties() throws OpenwireException {
+        if (readOnlyProperties) {
+            throw new OpenwireException("Message properties are read-only");
+        }
+    }
+
+    protected void checkReadOnlyBody() throws OpenwireException {
+        if (readOnlyBody) {
+            throw new OpenwireException("Message body is read-only");
+        }
+    }
+
+    public boolean isExpired() {
+        long expireTime = this.getExpiration();
+        if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+            return true;
+        }
+        return false;
+    }
+
+    public Callback getAcknowledgeCallback() {
+        return acknowledgeCallback;
+    }
+
+    public void setAcknowledgeCallback(Callback acknowledgeCallback) {
+        this.acknowledgeCallback = acknowledgeCallback;
+    }
+
+    /**
+     * Send operation event listener. Used to get the message ready to be sent.
+     */
+    public void onSend() throws OpenwireException {
+        setReadOnlyBody(true);
+        setReadOnlyProperties(true);
+    }
+
+    public Response visit(CommandVisitor visitor) throws Exception {
+        return visitor.processMessage(this);
+    }
+}

Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.activemq.apollo.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ */
+public class ActiveMQObjectMessage extends ActiveMQMessage {
+    
+    // TODO: verify classloader
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
+    static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 
+
+    protected transient Serializable object;
+
+    public Message copy() {
+        ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
+        copy(copy);
+        return copy;
+    }
+
+    private void copy(ActiveMQObjectMessage copy) {
+        storeContent();
+        super.copy(copy);
+        copy.object = null;
+    }
+
+    public void storeContent() {
+        Buffer bodyAsBytes = getContent();
+        if (bodyAsBytes == null && object != null) {
+            try {
+                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                OutputStream os = bytesOut;
+                if (Settings.enable_compression()) {
+                    compressed = true;
+                    os = new DeflaterOutputStream(os);
+                }
+                DataOutputStream dataOut = new DataOutputStream(os);
+                ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+                objOut.writeObject(object);
+                objOut.flush();
+                objOut.reset();
+                objOut.close();
+                setContent(bytesOut.toBuffer());
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe.getMessage(), ioe);
+            }
+        }
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public String getJMSXMimeType() {
+        return "jms/object-message";
+    }
+
+    /**
+     */
+    public void clearBody() throws OpenwireException {
+        super.clearBody();
+        this.object = null;
+    }
+
+    public void setObject(Serializable newObject) throws OpenwireException {
+        checkReadOnlyBody();
+        this.object = newObject;
+        setContent(null);
+        storeContent();
+    }
+
+    /**
+     * Gets the serializable object containing this message's data. The default
+     * value is null.
+     * 
+     * @return the serializable object containing this message's data
+     * @throws OpenwireException
+     */
+    public Serializable getObject() throws OpenwireException {
+        if (object == null && getContent() != null) {
+            try {
+                Buffer content = getContent();
+                InputStream is = new ByteArrayInputStream(content);
+                if (isCompressed()) {
+                    is = new InflaterInputStream(is);
+                }
+                DataInputStream dataIn = new DataInputStream(is);
+                ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+                try {
+                    object = (Serializable)objIn.readObject();
+                } catch (ClassNotFoundException ce) {
+                    throw new OpenwireException("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
+                } finally {
+                    dataIn.close();
+                }
+            } catch (IOException e) {
+                throw new OpenwireException("Failed to build body from bytes. Reason: " + e, e);
+            }
+        }
+        return this.object;
+    }
+
+    public void onMessageRolledBack() {
+        super.onMessageRolledBack();
+
+        // lets force the object to be deserialized again - as we could have
+        // changed the object
+        object = null;
+    }
+
+    public String toString() {
+        try {
+            getObject();
+        } catch (OpenwireException e) {
+        }
+        return super.toString();
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * 
+ * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
+ *                         Destination"
+ * 
+ * @openwire:marshaller code="100"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQQueue extends ActiveMQDestination {
+
+    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_QUEUE;
+    private static final long serialVersionUID = -3885260014960795889L;
+
+    public ActiveMQQueue() {
+    }
+
+    public ActiveMQQueue(String name) {
+        super(name);
+    }
+
+    public byte getDataStructureType() {
+        return DATA_STRUCTURE_TYPE;
+    }
+
+    public boolean isQueue() {
+        return true;
+    }
+
+    public String getQueueName() throws OpenwireException {
+        return getPhysicalName();
+    }
+
+    public byte getDestinationType() {
+        return QUEUE_TYPE;
+    }
+
+    protected String getQualifiedPrefix() {
+        return QUEUE_QUALIFIED_PREFIX;
+    }
+
+}



Mime
View raw message