Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBytesMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.BufferEditor;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * @openwire:marshaller code=24
+ */
+public class ActiveMQBytesMessage extends ActiveMQMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
+
+ protected transient DataOutputStream dataOut;
+ protected transient ByteArrayOutputStream bytesOut;
+ protected transient DataInputStream dataIn;
+ protected transient int length;
+
+ public Message copy() {
+ ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQBytesMessage copy) {
+ storeContent();
+ super.copy(copy);
+ copy.dataOut = null;
+ copy.bytesOut = null;
+ copy.dataIn = null;
+ }
+
+ public void onSend() throws OpenwireException {
+ super.onSend();
+ storeContent();
+ }
+
+ private void storeContent() {
+ try {
+ if (dataOut != null) {
+ dataOut.close();
+ Buffer bs = bytesOut.toBuffer();
+ if (compressed) {
+ int pos = bs.offset;
+ bs.offset = 0;
+ BufferEditor e = BufferEditor.big(bs);
+ e.writeInt(length);
+ bs.offset = pos;
+ }
+ setContent(bs);
+ bytesOut = null;
+ dataOut = null;
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
+ // RuntimeException
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/bytes-message";
+ }
+
+ public void clearBody() throws OpenwireException {
+ super.clearBody();
+ this.dataOut = null;
+ this.dataIn = null;
+ this.bytesOut = null;
+ }
+
+ public long getBodyLength() throws OpenwireException {
+ initializeReading();
+ return length;
+ }
+
+ public boolean readBoolean() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readBoolean();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public byte readByte() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readByte();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public int readUnsignedByte() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readUnsignedByte();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public short readShort() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readShort();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public int readUnsignedShort() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readUnsignedShort();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public char readChar() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readChar();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public int readInt() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readInt();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public long readLong() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readLong();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public float readFloat() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readFloat();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public double readDouble() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readDouble();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public String readUTF() throws OpenwireException {
+ initializeReading();
+ try {
+ return this.dataIn.readUTF();
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public int readBytes(byte[] value) throws OpenwireException {
+ return readBytes(value, value.length);
+ }
+
+ public int readBytes(byte[] value, int length) throws OpenwireException {
+ initializeReading();
+ try {
+ int n = 0;
+ while (n < length) {
+ int count = this.dataIn.read(value, n, length - n);
+ if (count < 0) {
+ break;
+ }
+ n += count;
+ }
+ if (n == 0 && length > 0) {
+ n = -1;
+ }
+ return n;
+ } catch (EOFException e) {
+ throw new OpenwireException(e);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public void writeBoolean(boolean value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeBoolean(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeByte(byte value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeByte(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeShort(short value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeShort(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeChar(char value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeChar(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeInt(int value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeInt(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeLong(long value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeLong(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeFloat(float value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeFloat(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeDouble(double value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeDouble(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeUTF(String value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.writeUTF(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeBytes(byte[] value) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.write(value);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeBytes(byte[] value, int offset, int length) throws OpenwireException {
+ initializeWriting();
+ try {
+ this.dataOut.write(value, offset, length);
+ } catch (IOException ioe) {
+ throw new OpenwireException(ioe);
+ }
+ }
+
+ public void writeObject(Object value) throws OpenwireException {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ initializeWriting();
+ if (value instanceof Boolean) {
+ writeBoolean(((Boolean)value).booleanValue());
+ } else if (value instanceof Character) {
+ writeChar(((Character)value).charValue());
+ } else if (value instanceof Byte) {
+ writeByte(((Byte)value).byteValue());
+ } else if (value instanceof Short) {
+ writeShort(((Short)value).shortValue());
+ } else if (value instanceof Integer) {
+ writeInt(((Integer)value).intValue());
+ } else if (value instanceof Long) {
+ writeLong(((Long)value).longValue());
+ } else if (value instanceof Float) {
+ writeFloat(((Float)value).floatValue());
+ } else if (value instanceof Double) {
+ writeDouble(((Double)value).doubleValue());
+ } else if (value instanceof String) {
+ writeUTF(value.toString());
+ } else if (value instanceof byte[]) {
+ writeBytes((byte[])value);
+ } else {
+ throw new OpenwireException("Cannot write non-primitive type:" + value.getClass());
+ }
+ }
+
+ public void reset() throws OpenwireException {
+ storeContent();
+ this.bytesOut = null;
+ this.dataIn = null;
+ this.dataOut = null;
+ setReadOnlyBody(true);
+ }
+
+ private void initializeWriting() throws OpenwireException {
+ checkReadOnlyBody();
+ if (this.dataOut == null) {
+ this.bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ if (Settings.enable_compression()) {
+ // keep track of the real length of the content if
+ // we are compressed.
+ try {
+ os.write(new byte[4]);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ length = 0;
+ compressed = true;
+ Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+ os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
+ public void write(byte[] arg0) throws IOException {
+ length += arg0.length;
+ out.write(arg0);
+ }
+
+ public void write(byte[] arg0, int arg1, int arg2) throws IOException {
+ length += arg2;
+ out.write(arg0, arg1, arg2);
+ }
+
+ public void write(int arg0) throws IOException {
+ length++;
+ out.write(arg0);
+ }
+ };
+ }
+ this.dataOut = new DataOutputStream(os);
+ }
+ }
+
+ protected void checkWriteOnlyBody() throws OpenwireException {
+ if (!readOnlyBody) {
+ throw new OpenwireException("Message body is write-only");
+ }
+ }
+
+ private void initializeReading() throws OpenwireException {
+ checkWriteOnlyBody();
+ if (dataIn == null) {
+ Buffer data = getContent();
+ if (data == null) {
+ data = new Buffer(new byte[] {}, 0, 0);
+ }
+ InputStream is = new ByteArrayInputStream(data);
+ if (isCompressed()) {
+ // keep track of the real length of the content if
+ // we are compressed.
+ try {
+ DataInputStream dis = new DataInputStream(is);
+ length = dis.readInt();
+ dis.close();
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ is = new InflaterInputStream(is);
+ } else {
+ length = data.getLength();
+ }
+ dataIn = new DataInputStream(is);
+ }
+ }
+
+ public void setObjectProperty(String name, Object value) throws OpenwireException {
+ initializeWriting();
+ super.setObjectProperty(name, value);
+ }
+
+ public String toString() {
+ return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import java.util.ArrayList;
+
+import org.apache.activemq.apollo.broker.DestinationParser;
+import org.apache.activemq.apollo.broker.LocalRouter;
+import org.apache.activemq.apollo.dto.DestinationDTO;
+import org.apache.activemq.apollo.util.path.Path;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ * @openwire:marshaller
+ * @version $Revision: 1.10 $
+ */
+abstract public class ActiveMQDestination implements DataStructure, Comparable {
+
+ public static final String PATH_SEPERATOR = ".";
+ public static final char COMPOSITE_SEPERATOR = ',';
+
+
+ public static final DestinationParser PARSER = new DestinationParser();
+ static {
+ PARSER.path_seperator = new AsciiBuffer(".");
+ PARSER.any_child_wildcard = new AsciiBuffer("*");
+ PARSER.any_descendant_wildcard = new AsciiBuffer(">");
+ }
+
+ public static final byte QUEUE_TYPE = 0x01;
+ public static final byte TOPIC_TYPE = 0x02;
+ public static final byte TEMP_MASK = 0x04;
+ public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+ public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+
+ public static final String QUEUE_QUALIFIED_PREFIX = "queue://";
+ public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
+ public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
+ public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
+
+ public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
+
+ private static final long serialVersionUID = -3885260014960795889L;
+
+ protected String physicalName;
+
+ protected transient DestinationDTO[] destination;
+
+ public ActiveMQDestination() {
+ }
+
+ protected ActiveMQDestination(String name) {
+ setPhysicalName(name);
+ }
+
+
+ public DestinationDTO[] toDestination() {
+ return destination;
+ }
+
+ // static helper methods for working with destinations
+ // -------------------------------------------------------------------------
+ public static ActiveMQDestination createDestination(String name, byte defaultType) {
+
+ if (name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
+ return new ActiveMQQueue(name.substring(QUEUE_QUALIFIED_PREFIX.length()));
+ } else if (name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
+ return new ActiveMQTopic(name.substring(TOPIC_QUALIFIED_PREFIX.length()));
+ } else if (name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
+ return new ActiveMQTempQueue(name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length()));
+ } else if (name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
+ return new ActiveMQTempTopic(name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length()));
+ }
+
+ switch (defaultType) {
+ case QUEUE_TYPE:
+ return new ActiveMQQueue(name);
+ case TOPIC_TYPE:
+ return new ActiveMQTopic(name);
+ case TEMP_QUEUE_TYPE:
+ return new ActiveMQTempQueue(name);
+ case TEMP_TOPIC_TYPE:
+ return new ActiveMQTempTopic(name);
+ default:
+ throw new IllegalArgumentException("Invalid default destination type: " + defaultType);
+ }
+ }
+
+ public static int compare(ActiveMQDestination destination, ActiveMQDestination destination2) {
+ if (destination == destination2) {
+ return 0;
+ }
+ if (destination == null) {
+ return -1;
+ } else if (destination2 == null) {
+ return 1;
+ } else {
+ if (destination.isQueue() == destination2.isQueue()) {
+ return destination.getPhysicalName().compareTo(destination2.getPhysicalName());
+ } else {
+ return destination.isQueue() ? -1 : 1;
+ }
+ }
+ }
+
+ public int compareTo(Object that) {
+ if (that instanceof ActiveMQDestination) {
+ return compare(this, (ActiveMQDestination)that);
+ }
+ if (that == null) {
+ return 1;
+ } else {
+ return getClass().getName().compareTo(that.getClass().getName());
+ }
+ }
+
+
+ protected abstract String getQualifiedPrefix();
+
+ /**
+ * @openwire:property version=1
+ */
+ public String getPhysicalName() {
+ return physicalName;
+ }
+
+ DestinationDTO[] create_destination(AsciiBuffer domain, Path path) {
+ return DestinationParser.create_destination(domain, DestinationParser.encode_path(path));
+ }
+
+ public void setPhysicalName(String value) {
+ physicalName = value;
+ String[] composites = value.split(",");
+ if(composites.length == 1) {
+ Path path = PARSER.parsePath(new AsciiBuffer(composites[0]));
+ switch(getDestinationType()) {
+ case QUEUE_TYPE:
+ destination = create_destination(LocalRouter.QUEUE_DOMAIN(), path);
+ break;
+ case TOPIC_TYPE:
+ destination = create_destination(LocalRouter.TOPIC_DOMAIN(), path);
+ break;
+ case TEMP_QUEUE_TYPE:
+ destination = create_destination(LocalRouter.TEMP_QUEUE_DOMAIN(), path);
+ break;
+ case TEMP_TOPIC_TYPE:
+ destination = create_destination(LocalRouter.TEMP_TOPIC_DOMAIN(), path);
+ break;
+ }
+ } else {
+ ArrayList<DestinationDTO> l = new ArrayList<DestinationDTO>();
+ for( String c:composites ) {
+ l.add(createDestination(c).destination[0]);
+ }
+ destination = l.toArray(new DestinationDTO[l.size()]);
+ }
+
+ }
+
+ public ActiveMQDestination createDestination(String name) {
+ return createDestination(name, getDestinationType());
+ }
+
+ public abstract byte getDestinationType();
+
+ public boolean isQueue() {
+ return false;
+ }
+
+ public boolean isTopic() {
+ return false;
+ }
+
+ public boolean isTemporary() {
+ return false;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ActiveMQDestination d = (ActiveMQDestination)o;
+ return destination.equals(d.destination);
+ }
+
+ public int hashCode() {
+ return destination.hashCode();
+ }
+
+ public String toString() {
+ return destination.toString();
+ }
+
+ public String getDestinationTypeAsString() {
+ switch (getDestinationType()) {
+ case QUEUE_TYPE:
+ return "Queue";
+ case TOPIC_TYPE:
+ return "Topic";
+ case TEMP_QUEUE_TYPE:
+ return "TempQueue";
+ case TEMP_TOPIC_TYPE:
+ return "TempTopic";
+ default:
+ throw new IllegalArgumentException("Invalid destination type: " + getDestinationType());
+ }
+ }
+
+ public boolean isMarshallAware() {
+ return false;
+ }
+
+ public boolean isComposite() {
+ throw new UnsupportedOperationException();
+ }
+
+ public ActiveMQDestination[] getCompositeDestinations() {
+ throw new UnsupportedOperationException();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMapMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.openwire.codec.OpenWireFormat;
+import org.apache.activemq.apollo.openwire.support.MarshallingSupport;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+/**
+
+ */
+public class ActiveMQMapMessage extends ActiveMQMessage {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MAP_MESSAGE;
+
+ protected transient Map<String, Object> map = new HashMap<String, Object>();
+
+ public Message copy() {
+ ActiveMQMapMessage copy = new ActiveMQMapMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQMapMessage copy) {
+ storeContent();
+ super.copy(copy);
+ }
+
+ // We only need to marshal the content if we are hitting the wire.
+ public void beforeMarshall(OpenWireFormat wireFormat) throws IOException {
+ super.beforeMarshall(wireFormat);
+ storeContent();
+ }
+
+ private void storeContent() {
+ try {
+ if (getContent() == null && !map.isEmpty()) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ if (Settings.enable_compression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.close();
+ setContent(bytesOut.toBuffer());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds the message body from data
+ *
+ * @throws OpenwireException
+ */
+ private void loadContent() throws OpenwireException {
+ try {
+ if (getContent() != null && map.isEmpty()) {
+ Buffer content = getContent();
+ InputStream is = new ByteArrayInputStream(content);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ }
+ DataInputStream dataIn = new DataInputStream(is);
+ map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
+ dataIn.close();
+ }
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/map-message";
+ }
+
+ public void clearBody() throws OpenwireException {
+ super.clearBody();
+ map.clear();
+ }
+
+ public boolean getBoolean(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return false;
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean)value).booleanValue();
+ }
+ if (value instanceof String) {
+ return Boolean.valueOf(value.toString()).booleanValue();
+ } else {
+ throw new OpenwireException(" cannot read a boolean from " + value.getClass().getName());
+ }
+ }
+
+ public byte getByte(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).byteValue();
+ }
+ if (value instanceof String) {
+ return Byte.valueOf(value.toString()).byteValue();
+ } else {
+ throw new OpenwireException(" cannot read a byte from " + value.getClass().getName());
+ }
+ }
+
+ public short getShort(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Short) {
+ return ((Short)value).shortValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).shortValue();
+ }
+ if (value instanceof String) {
+ return Short.valueOf(value.toString()).shortValue();
+ } else {
+ throw new OpenwireException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+
+ public char getChar(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ if (value instanceof Character) {
+ return ((Character)value).charValue();
+ } else {
+ throw new OpenwireException(" cannot read a short from " + value.getClass().getName());
+ }
+ }
+
+ public int getInt(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Integer) {
+ return ((Integer)value).intValue();
+ }
+ if (value instanceof Short) {
+ return ((Short)value).intValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).intValue();
+ }
+ if (value instanceof String) {
+ return Integer.valueOf(value.toString()).intValue();
+ } else {
+ throw new OpenwireException(" cannot read an int from " + value.getClass().getName());
+ }
+ }
+
+ public long getLong(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Long) {
+ return ((Long)value).longValue();
+ }
+ if (value instanceof Integer) {
+ return ((Integer)value).longValue();
+ }
+ if (value instanceof Short) {
+ return ((Short)value).longValue();
+ }
+ if (value instanceof Byte) {
+ return ((Byte)value).longValue();
+ }
+ if (value instanceof String) {
+ return Long.valueOf(value.toString()).longValue();
+ } else {
+ throw new OpenwireException(" cannot read a long from " + value.getClass().getName());
+ }
+ }
+
+ public float getFloat(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Float) {
+ return ((Float)value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new OpenwireException(" cannot read a float from " + value.getClass().getName());
+ }
+ }
+
+ public double getDouble(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return 0;
+ }
+ if (value instanceof Double) {
+ return ((Double)value).doubleValue();
+ }
+ if (value instanceof Float) {
+ return ((Float)value).floatValue();
+ }
+ if (value instanceof String) {
+ return Float.valueOf(value.toString()).floatValue();
+ } else {
+ throw new OpenwireException(" cannot read a double from " + value.getClass().getName());
+ }
+ }
+
+ public String getString(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof byte[]) {
+ throw new OpenwireException("Use getBytes to read a byte array");
+ } else {
+ return value.toString();
+ }
+ }
+
+ public byte[] getBytes(String name) throws OpenwireException {
+ initializeReading();
+ Object value = map.get(name);
+ if (value instanceof byte[]) {
+ return (byte[])value;
+ } else {
+ throw new OpenwireException(" cannot read a byte[] from " + value.getClass().getName());
+ }
+ }
+
+ public Object getObject(String name) throws OpenwireException {
+ initializeReading();
+ return map.get(name);
+ }
+
+ public Enumeration<String> getMapNames() throws OpenwireException {
+ initializeReading();
+ return Collections.enumeration(map.keySet());
+ }
+
+ protected void put(String name, Object value) throws OpenwireException {
+ if (name == null) {
+ throw new IllegalArgumentException("The name of the property cannot be null.");
+ }
+ if (name.length() == 0) {
+ throw new IllegalArgumentException("The name of the property cannot be an emprty string.");
+ }
+ map.put(name, value);
+ }
+
+ public void setBoolean(String name, boolean value) throws OpenwireException {
+ initializeWriting();
+ put(name, value ? Boolean.TRUE : Boolean.FALSE);
+ }
+
+ public void setByte(String name, byte value) throws OpenwireException {
+ initializeWriting();
+ put(name, Byte.valueOf(value));
+ }
+
+ public void setShort(String name, short value) throws OpenwireException {
+ initializeWriting();
+ put(name, Short.valueOf(value));
+ }
+
+ public void setChar(String name, char value) throws OpenwireException {
+ initializeWriting();
+ put(name, Character.valueOf(value));
+ }
+
+ public void setInt(String name, int value) throws OpenwireException {
+ initializeWriting();
+ put(name, Integer.valueOf(value));
+ }
+
+ public void setLong(String name, long value) throws OpenwireException {
+ initializeWriting();
+ put(name, Long.valueOf(value));
+ }
+
+ public void setFloat(String name, float value) throws OpenwireException {
+ initializeWriting();
+ put(name, new Float(value));
+ }
+
+ public void setDouble(String name, double value) throws OpenwireException {
+ initializeWriting();
+ put(name, new Double(value));
+ }
+
+ public void setString(String name, String value) throws OpenwireException {
+ initializeWriting();
+ put(name, value);
+ }
+
+ public void setBytes(String name, byte[] value) throws OpenwireException {
+ initializeWriting();
+ if (value != null) {
+ put(name, value);
+ } else {
+ map.remove(name);
+ }
+ }
+
+ public void setBytes(String name, byte[] value, int offset, int length) throws OpenwireException {
+ initializeWriting();
+ byte[] data = new byte[length];
+ System.arraycopy(value, offset, data, 0, length);
+ put(name, data);
+ }
+
+ public void setObject(String name, Object value) throws OpenwireException {
+ initializeWriting();
+ if (value != null) {
+ // byte[] not allowed on properties
+ if (!(value instanceof byte[])) {
+ checkValidObject(value);
+ }
+ put(name, value);
+ } else {
+ put(name, null);
+ }
+ }
+
+ public boolean itemExists(String name) throws OpenwireException {
+ initializeReading();
+ return map.containsKey(name);
+ }
+
+ private void initializeReading() throws OpenwireException {
+ loadContent();
+ }
+
+ private void initializeWriting() throws OpenwireException {
+ checkReadOnlyBody();
+ setContent(null);
+ }
+
+ public String toString() {
+ return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
+ }
+
+ public Map<String, Object> getContentMap() throws OpenwireException {
+ initializeReading();
+ return map;
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.util.Callback;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.apache.activemq.apollo.openwire.support.TypeConversionSupport;
+import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.*;
+
+/**
+ * @version $Revision:$
+ * @openwire:marshaller code="23"
+ */
+public class ActiveMQMessage extends Message {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
+ private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
+
+ protected transient Callback acknowledgeCallback;
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+
+ public Message copy() {
+ ActiveMQMessage copy = new ActiveMQMessage();
+ copy(copy);
+ return copy;
+ }
+
+ protected void copy(ActiveMQMessage copy) {
+ super.copy(copy);
+ copy.acknowledgeCallback = acknowledgeCallback;
+ }
+
+ public int hashCode() {
+ MessageId id = getMessageId();
+ if (id != null) {
+ return id.hashCode();
+ } else {
+ return super.hashCode();
+ }
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ }
+
+ ActiveMQMessage msg = (ActiveMQMessage) o;
+ MessageId oMsg = msg.getMessageId();
+ MessageId thisMsg = this.getMessageId();
+ return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
+ }
+
+ public void acknowledge() throws OpenwireException {
+ if (acknowledgeCallback != null) {
+ try {
+ acknowledgeCallback.execute();
+ } catch (OpenwireException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new OpenwireException(e);
+ }
+ }
+ }
+
+ public void clearBody() throws OpenwireException {
+ setContent(null);
+ readOnlyBody = false;
+ }
+
+ public String getJMSMessageID() {
+ MessageId messageId = this.getMessageId();
+ if (messageId == null) {
+ return null;
+ }
+ return messageId.toString();
+ }
+
+ /**
+ * Seems to be invalid because the parameter doesn't initialize MessageId
+ * instance variables ProducerId and ProducerSequenceId
+ *
+ * @param value
+ * @throws OpenwireException
+ */
+ public void setJMSMessageID(String value) throws OpenwireException {
+ if (value != null) {
+ try {
+ MessageId id = new MessageId(value);
+ this.setMessageId(id);
+ } catch (NumberFormatException e) {
+ // we must be some foreign JMS provider or strange user-supplied
+ // String
+ // so lets set the IDs to be 1
+ MessageId id = new MessageId();
+ id.setTextView(value);
+ this.setMessageId(messageId);
+ }
+ } else {
+ this.setMessageId(null);
+ }
+ }
+
+ /**
+ * This will create an object of MessageId. For it to be valid, the instance
+ * variable ProducerId and producerSequenceId must be initialized.
+ *
+ * @param producerId
+ * @param producerSequenceId
+ * @throws OpenwireException
+ */
+ public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws OpenwireException {
+ MessageId id = null;
+ try {
+ id = new MessageId(producerId, producerSequenceId);
+ this.setMessageId(id);
+ } catch (Throwable e) {
+ throw new OpenwireException("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
+ }
+ }
+
+ public long getJMSTimestamp() {
+ return this.getTimestamp();
+ }
+
+ public void setJMSTimestamp(long timestamp) {
+ this.setTimestamp(timestamp);
+ }
+
+ public String getJMSCorrelationID() {
+ return this.getCorrelationId();
+ }
+
+ public void setJMSCorrelationID(String correlationId) {
+ this.setCorrelationId(correlationId);
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws OpenwireException {
+ return encodeString(this.getCorrelationId());
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws OpenwireException {
+ this.setCorrelationId(decodeString(correlationId));
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/message";
+ }
+
+ protected static String decodeString(byte[] data) throws OpenwireException {
+ try {
+ if (data == null) {
+ return null;
+ }
+ return new String(data, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
+ }
+ }
+
+ protected static byte[] encodeString(String data) throws OpenwireException {
+ try {
+ if (data == null) {
+ return null;
+ }
+ return data.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
+ }
+ }
+
+ public ActiveMQDestination getJMSReplyTo() {
+ return this.getReplyTo();
+ }
+
+ public void setJMSReplyTo(ActiveMQDestination destination) throws OpenwireException {
+ this.setReplyTo(destination);
+ }
+
+ public ActiveMQDestination getJMSDestination() {
+ return this.getDestination();
+ }
+
+ public void setJMSDestination(ActiveMQDestination destination) throws OpenwireException {
+ this.setDestination(destination);
+ }
+
+ public int getJMSDeliveryMode() {
+ return this.isPersistent() ? 2 : 1;
+ }
+
+ public void setJMSDeliveryMode(int mode) {
+ this.setPersistent(mode == 2);
+ }
+
+ public boolean getJMSRedelivered() {
+ return this.isRedelivered();
+ }
+
+ public void setJMSRedelivered(boolean redelivered) {
+ this.setRedelivered(redelivered);
+ }
+
+ public String getJMSType() {
+ return this.getType();
+ }
+
+ public void setJMSType(String type) {
+ this.setType(type);
+ }
+
+ public long getJMSExpiration() {
+ return this.getExpiration();
+ }
+
+ public void setJMSExpiration(long expiration) {
+ this.setExpiration(expiration);
+ }
+
+ public int getJMSPriority() {
+ return this.getPriority();
+ }
+
+ public void setJMSPriority(int priority) {
+ this.setPriority((byte) priority);
+ }
+
+ public void clearProperties() {
+ super.clearProperties();
+ readOnlyProperties = false;
+ }
+
+ public boolean propertyExists(String name) throws OpenwireException {
+ try {
+ return this.getProperties().containsKey(name);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ public Enumeration getPropertyNames() throws OpenwireException {
+ try {
+ return new Vector<String>(this.getProperties().keySet()).elements();
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+
+ interface PropertySetter {
+
+ void set(Message message, Object value) throws OpenwireException;
+ }
+
+ static {
+ JMS_PROPERTY_SETERS.put("JMSXDeliveryCount", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setRedeliveryCounter(rc.intValue() - 1);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSXGroupID", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setGroupID(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setGroupSequence(rc.intValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSCorrelationID", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSCorrelationID(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (bool == null) {
+ throw new OpenwireException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
+ }
+ else {
+ rc = bool.booleanValue() ? 2 : 1;
+ }
+ }
+ ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ ActiveMQDestination rc = (ActiveMQDestination) TypeConversionSupport.convert(value, ActiveMQDestination.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSReplyTo cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setReplyTo(rc);
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSTimestamp", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
+ }
+ });
+ JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
+ public void set(Message message, Object value) throws OpenwireException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new OpenwireException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
+ }
+ ((ActiveMQMessage) message).setJMSType(rc);
+ }
+ });
+ }
+
+ public void setObjectProperty(String name, Object value) throws OpenwireException {
+ setObjectProperty(name, value, true);
+ }
+
+ public void setObjectProperty(String name, Object value, boolean checkReadOnly) throws OpenwireException {
+
+ if (checkReadOnly) {
+ checkReadOnlyProperties();
+ }
+ if (name == null || name.equals("")) {
+ throw new IllegalArgumentException("Property name cannot be empty or null");
+ }
+
+ checkValidObject(value);
+ PropertySetter setter = JMS_PROPERTY_SETERS.get(name);
+
+ if (setter != null && value != null) {
+ setter.set(this, value);
+ } else {
+ try {
+ this.setProperty(name, value);
+ } catch (IOException e) {
+ throw new OpenwireException(e);
+ }
+ }
+ }
+
+ public void setProperties(Map properties) throws OpenwireException {
+ for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry) iter.next();
+
+ // Lets use the object property method as we may contain standard
+ // extension headers like JMSXGroupID
+ setObjectProperty((String) entry.getKey(), entry.getValue());
+ }
+ }
+
+ protected void checkValidObject(Object value) throws OpenwireException {
+
+ boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
+ valid = valid || value instanceof Float || value instanceof Double || value instanceof Character || value instanceof String || value == null;
+
+ if (!valid) {
+
+ if (Settings.enable_nested_map_and_list()) {
+ if (!(value instanceof Map || value instanceof List)) {
+ throw new OpenwireException("Only objectified primitive objects, String, Map and List types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ } else {
+ throw new OpenwireException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ }
+ }
+
+ public Object getObjectProperty(String name) throws OpenwireException {
+ if (name == null) {
+ throw new NullPointerException("Property name cannot be null");
+ }
+// try {
+ return createFilterable().getProperty(name);
+// } catch (FilterException e) {
+// throw new JMSException(e);
+// }
+ }
+
+ public boolean getBooleanProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ return false;
+ }
+ Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
+ }
+ return rc.booleanValue();
+ }
+
+ public byte getByteProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
+ }
+ return rc.byteValue();
+ }
+
+ public short getShortProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
+ }
+ return rc.shortValue();
+ }
+
+ public int getIntProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
+ }
+ return rc.intValue();
+ }
+
+ public long getLongProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NumberFormatException("property " + name + " was null");
+ }
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
+ }
+ return rc.longValue();
+ }
+
+ public float getFloatProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
+ }
+ return rc.floatValue();
+ }
+
+ public double getDoubleProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ throw new NullPointerException("property " + name + " was null");
+ }
+ Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
+ }
+ return rc.doubleValue();
+ }
+
+ public String getStringProperty(String name) throws OpenwireException {
+ Object value = getObjectProperty(name);
+ if (value == null) {
+ if (name.equals("JMSXUserID")) {
+ value = getUserID();
+ }
+ }
+ if (value == null) {
+ return null;
+ }
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new OpenwireException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
+ }
+ return rc;
+ }
+
+ public void setBooleanProperty(String name, boolean value) throws OpenwireException {
+ setBooleanProperty(name, value, true);
+ }
+
+ public void setBooleanProperty(String name, boolean value, boolean checkReadOnly) throws OpenwireException {
+ setObjectProperty(name, Boolean.valueOf(value), checkReadOnly);
+ }
+
+ public void setByteProperty(String name, byte value) throws OpenwireException {
+ setObjectProperty(name, Byte.valueOf(value));
+ }
+
+ public void setShortProperty(String name, short value) throws OpenwireException {
+ setObjectProperty(name, Short.valueOf(value));
+ }
+
+ public void setIntProperty(String name, int value) throws OpenwireException {
+ setObjectProperty(name, Integer.valueOf(value));
+ }
+
+ public void setLongProperty(String name, long value) throws OpenwireException {
+ setObjectProperty(name, Long.valueOf(value));
+ }
+
+ public void setFloatProperty(String name, float value) throws OpenwireException {
+ setObjectProperty(name, new Float(value));
+ }
+
+ public void setDoubleProperty(String name, double value) throws OpenwireException {
+ setObjectProperty(name, new Double(value));
+ }
+
+ public void setStringProperty(String name, String value) throws OpenwireException {
+ setObjectProperty(name, value);
+ }
+
+ private void checkReadOnlyProperties() throws OpenwireException {
+ if (readOnlyProperties) {
+ throw new OpenwireException("Message properties are read-only");
+ }
+ }
+
+ protected void checkReadOnlyBody() throws OpenwireException {
+ if (readOnlyBody) {
+ throw new OpenwireException("Message body is read-only");
+ }
+ }
+
+ public boolean isExpired() {
+ long expireTime = this.getExpiration();
+ if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
+ return true;
+ }
+ return false;
+ }
+
+ public Callback getAcknowledgeCallback() {
+ return acknowledgeCallback;
+ }
+
+ public void setAcknowledgeCallback(Callback acknowledgeCallback) {
+ this.acknowledgeCallback = acknowledgeCallback;
+ }
+
+ /**
+ * Send operation event listener. Used to get the message ready to be sent.
+ */
+ public void onSend() throws OpenwireException {
+ setReadOnlyBody(true);
+ setReadOnlyProperties(true);
+ }
+
+ public Response visit(CommandVisitor visitor) throws Exception {
+ return visitor.processMessage(this);
+ }
+}
Propchange: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQObjectMessage.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire.command;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import org.apache.activemq.apollo.util.ClassLoadingAwareObjectInputStream;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.apache.activemq.apollo.openwire.support.Settings;
+import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.ByteArrayInputStream;
+import org.fusesource.hawtbuf.ByteArrayOutputStream;
+
+/**
+ */
+public class ActiveMQObjectMessage extends ActiveMQMessage {
+
+ // TODO: verify classloader
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
+ static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
+
+ protected transient Serializable object;
+
+ public Message copy() {
+ ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(ActiveMQObjectMessage copy) {
+ storeContent();
+ super.copy(copy);
+ copy.object = null;
+ }
+
+ public void storeContent() {
+ Buffer bodyAsBytes = getContent();
+ if (bodyAsBytes == null && object != null) {
+ try {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ if (Settings.enable_compression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
+ objOut.writeObject(object);
+ objOut.flush();
+ objOut.reset();
+ objOut.close();
+ setContent(bytesOut.toBuffer());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe);
+ }
+ }
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public String getJMSXMimeType() {
+ return "jms/object-message";
+ }
+
+ /**
+ */
+ public void clearBody() throws OpenwireException {
+ super.clearBody();
+ this.object = null;
+ }
+
+ public void setObject(Serializable newObject) throws OpenwireException {
+ checkReadOnlyBody();
+ this.object = newObject;
+ setContent(null);
+ storeContent();
+ }
+
+ /**
+ * Gets the serializable object containing this message's data. The default
+ * value is null.
+ *
+ * @return the serializable object containing this message's data
+ * @throws OpenwireException
+ */
+ public Serializable getObject() throws OpenwireException {
+ if (object == null && getContent() != null) {
+ try {
+ Buffer content = getContent();
+ InputStream is = new ByteArrayInputStream(content);
+ if (isCompressed()) {
+ is = new InflaterInputStream(is);
+ }
+ DataInputStream dataIn = new DataInputStream(is);
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
+ try {
+ object = (Serializable)objIn.readObject();
+ } catch (ClassNotFoundException ce) {
+ throw new OpenwireException("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
+ } finally {
+ dataIn.close();
+ }
+ } catch (IOException e) {
+ throw new OpenwireException("Failed to build body from bytes. Reason: " + e, e);
+ }
+ }
+ return this.object;
+ }
+
+ public void onMessageRolledBack() {
+ super.onMessageRolledBack();
+
+ // lets force the object to be deserialized again - as we could have
+ // changed the object
+ object = null;
+ }
+
+ public String toString() {
+ try {
+ getObject();
+ } catch (OpenwireException e) {
+ }
+ return super.toString();
+ }
+}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java?rev=1097189&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java Wed Apr 27 17:32:51 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.command;
+
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.openwire.support.OpenwireException;
+import org.fusesource.hawtbuf.AsciiBuffer;
+
+/**
+ *
+ * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
+ * Destination"
+ *
+ * @openwire:marshaller code="100"
+ * @version $Revision: 1.5 $
+ */
+public class ActiveMQQueue extends ActiveMQDestination {
+
+ public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_QUEUE;
+ private static final long serialVersionUID = -3885260014960795889L;
+
+ public ActiveMQQueue() {
+ }
+
+ public ActiveMQQueue(String name) {
+ super(name);
+ }
+
+ public byte getDataStructureType() {
+ return DATA_STRUCTURE_TYPE;
+ }
+
+ public boolean isQueue() {
+ return true;
+ }
+
+ public String getQueueName() throws OpenwireException {
+ return getPhysicalName();
+ }
+
+ public byte getDestinationType() {
+ return QUEUE_TYPE;
+ }
+
+ protected String getQualifiedPrefix() {
+ return QUEUE_QUALIFIED_PREFIX;
+ }
+
+}
|