activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [36/51] [partial] activemq-6 git commit: ACTIVEMQ6-2 Update to HQ master
Date Tue, 11 Nov 2014 11:01:06 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java
deleted file mode 100644
index c6b60f3..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java
+++ /dev/null
@@ -1,633 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton;
-
-import java.nio.ByteBuffer;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.CompositeWritableBuffer;
-import org.apache.qpid.proton.codec.DroppingWritableBuffer;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.jms.EncodedMessage;
-import org.apache.qpid.proton.message.Message;
-import org.apache.qpid.proton.message.MessageFormat;
-import org.apache.qpid.proton.message.impl.MessageImpl;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.utils.TypedProperties;
-
-import static org.hornetq.api.core.Message.TEXT_TYPE;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         4/11/13
- */
-public class ProtonUtils
-{
-   private static final String PREFIX = "HORNETQ_PROTON_";
-   private static final String MESSAGE_ANNOTATIONS = PREFIX + "MESSAGE_ANNOTATIONS_";
-   private static final String DELIVERY_ANNOTATIONS = PREFIX + "DELIVERY_ANNOTATIONS_";
-   private static final String FOOTER_VALUES = PREFIX + "FOOTER_VALUES";
-   private static final String MESSAGE_FORMAT = PREFIX + "MESSAGE_FORMAT";
-   private static final String PROTON_MESSAGE_FORMAT = PREFIX + "FORMAT";
-   private static final String PROTON_MESSAGE_SIZE = "PROTON_MESSAGE_SIZE";
-   private static final String MESSAGE_TYPE = PREFIX + "MESSAGE_TYPE";
-   private static final String FIRST_ACQUIRER = PREFIX + "FIRST_ACQUIRER";
-   private static final String USER_ID = PREFIX + "USER_ID";
-   private static final String SUBJECT = PREFIX + "SUBJECT";
-   private static final String REPLY_TO = PREFIX + "REPLY_TO";
-   private static final String CORRELATION_ID = PREFIX + "CORRELATION_ID";
-   private static final String CONTENT_TYPE = PREFIX + "CONTENT_TYPE";
-   private static final String CONTENT_ENCODING = PREFIX + "CONTENT_TYPE";
-   private static final String ABSOLUTE_EXPIRY_TIME = PREFIX + "ABSOLUTE_EXPIRY_TIME";
-   private static final String CREATION_TIME = PREFIX + "CREATION_TIME";
-   private static final String GROUP_ID = PREFIX + "GROUP_ID";
-   private static final String GROUP_SEQUENCE = PREFIX + "GROUP_SEQUENCE";
-   private static final String REPLY_TO_GROUP_ID = PREFIX + "REPLY_TO_GROUP_ID";
-
-   private static final SimpleString USER_ID_SS = new SimpleString(USER_ID);
-   private static final SimpleString SUBJECT_SS = new SimpleString(SUBJECT);
-   private static final SimpleString REPLY_TO_SS = new SimpleString(REPLY_TO);
-   private static final SimpleString CORRELATION_ID_SS = new SimpleString(CORRELATION_ID);
-   private static final SimpleString CONTENT_TYPE_SS = new SimpleString(CONTENT_TYPE);
-   private static final SimpleString CONTENT_ENCODING_SS = new SimpleString(CONTENT_ENCODING);
-   private static final SimpleString ABSOLUTE_EXPIRY_TIME_SS = new SimpleString(ABSOLUTE_EXPIRY_TIME);
-   private static final SimpleString CREATION_TIME_SS = new SimpleString(CREATION_TIME);
-   private static final SimpleString GROUP_ID_SS = new SimpleString(GROUP_ID);
-   private static final SimpleString GROUP_SEQUENCE_SS = new SimpleString(GROUP_SEQUENCE);
-   private static final SimpleString REPLY_TO_GROUP_ID_SS = new SimpleString(REPLY_TO_GROUP_ID);
-   private static final SimpleString PROTON_MESSAGE_SIZE_SS = new SimpleString(PROTON_MESSAGE_SIZE);
-
-   private static Set<String> SPECIAL_PROPS = new HashSet<String>();
-
-   static
-   {
-      SPECIAL_PROPS.add(MESSAGE_FORMAT);
-      SPECIAL_PROPS.add(MESSAGE_TYPE);
-      SPECIAL_PROPS.add(FIRST_ACQUIRER);
-      SPECIAL_PROPS.add(PROTON_MESSAGE_FORMAT);
-      SPECIAL_PROPS.add(PROTON_MESSAGE_SIZE);
-      SPECIAL_PROPS.add(MESSAGE_TYPE);
-      SPECIAL_PROPS.add(USER_ID);
-      SPECIAL_PROPS.add(SUBJECT);
-      SPECIAL_PROPS.add(REPLY_TO);
-      SPECIAL_PROPS.add(CORRELATION_ID);
-      SPECIAL_PROPS.add(CONTENT_TYPE);
-      SPECIAL_PROPS.add(CONTENT_ENCODING);
-      SPECIAL_PROPS.add(ABSOLUTE_EXPIRY_TIME);
-      SPECIAL_PROPS.add(CREATION_TIME);
-      SPECIAL_PROPS.add(GROUP_ID);
-      SPECIAL_PROPS.add(GROUP_SEQUENCE);
-      SPECIAL_PROPS.add(REPLY_TO_GROUP_ID);
-   }
-
-   public static class INBOUND
-   {
-      public static ServerMessageImpl transform(ProtonRemotingConnection connection, EncodedMessage encodedMessage) throws Exception
-      {
-         org.apache.qpid.proton.message.Message protonMessage = encodedMessage.decode();
-
-         Header header = protonMessage.getHeader();
-         if (header == null)
-         {
-            header = new Header();
-         }
-
-         ServerMessageImpl message = connection.createServerMessage();
-         TypedProperties properties = message.getProperties();
-
-         properties.putLongProperty(new SimpleString(MESSAGE_FORMAT), encodedMessage.getMessageFormat());
-         properties.putLongProperty(new SimpleString(PROTON_MESSAGE_FORMAT), getMessageFormat(protonMessage.getMessageFormat()));
-         properties.putIntProperty(new SimpleString(PROTON_MESSAGE_SIZE), encodedMessage.getLength());
-
-         populateSpecialProps(header, protonMessage, message, properties);
-         populateHeaderProperties(header, properties, message);
-         populateDeliveryAnnotations(protonMessage.getDeliveryAnnotations(), properties);
-         populateMessageAnnotations(protonMessage.getMessageAnnotations(), properties);
-         populateApplicationProperties(protonMessage.getApplicationProperties(), properties);
-         populateProperties(protonMessage.getProperties(), properties, message);
-         populateFooterProperties(protonMessage.getFooter(), properties);
-         message.setTimestamp(System.currentTimeMillis());
-
-         Section section = protonMessage.getBody();
-         if (section instanceof AmqpValue)
-         {
-            AmqpValue amqpValue = (AmqpValue) section;
-            Object value = amqpValue.getValue();
-            if (value instanceof String)
-            {
-               message.getBodyBuffer().writeNullableString((String) value);
-            }
-            else if (value instanceof Binary)
-            {
-               Binary binary = (Binary) value;
-               message.getBodyBuffer().writeBytes(binary.getArray());
-            }
-         }
-         else if (section instanceof Data)
-         {
-            message.getBodyBuffer().writeBytes(((Data) section).getValue().getArray());
-         }
-
-         return message;
-      }
-
-      private static void populateSpecialProps(Header header, Message protonMessage, ServerMessageImpl message, TypedProperties properties)
-      {
-         if (header.getFirstAcquirer() != null)
-         {
-            properties.putBooleanProperty(new SimpleString(FIRST_ACQUIRER), header.getFirstAcquirer());
-         }
-         properties.putIntProperty(new SimpleString(MESSAGE_TYPE), getMessageType(protonMessage));
-      }
-
-      private static void populateHeaderProperties(Header header, TypedProperties properties, ServerMessageImpl message)
-      {
-         if (header.getDurable() != null)
-         {
-            message.setDurable(header.getDurable());
-         }
-
-         if (header.getPriority() != null)
-         {
-            message.setPriority((byte) header.getPriority().intValue());
-         }
-
-         if (header.getTtl() != null)
-         {
-            message.setExpiration(header.getTtl().longValue());
-         }
-      }
-
-      private static void populateDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations, TypedProperties properties)
-      {
-         if (deliveryAnnotations != null)
-         {
-            Map values = deliveryAnnotations.getValue();
-            Set keySet = values.keySet();
-            for (Object key : keySet)
-            {
-               Symbol symbol = (Symbol) key;
-               Object value = values.get(key);
-               properties.putSimpleStringProperty(new SimpleString(DELIVERY_ANNOTATIONS + symbol.toString()), new SimpleString(value.toString()));
-            }
-         }
-      }
-
-      private static void populateFooterProperties(Footer footer, TypedProperties properties)
-      {
-         if (footer != null)
-         {
-            Map values = footer.getValue();
-            Set keySet = values.keySet();
-            for (Object key : keySet)
-            {
-               Symbol symbol = (Symbol) key;
-               Object value = values.get(key);
-               properties.putSimpleStringProperty(new SimpleString(FOOTER_VALUES + symbol.toString()), new SimpleString(value.toString()));
-            }
-         }
-      }
-
-      private static void populateProperties(Properties amqpProperties, TypedProperties properties, ServerMessageImpl message)
-      {
-         if (amqpProperties == null)
-         {
-            return;
-         }
-         if (amqpProperties.getTo() != null)
-         {
-            message.setAddress(new SimpleString(amqpProperties.getTo()));
-         }
-         if (amqpProperties.getUserId() != null)
-         {
-            properties.putBytesProperty(USER_ID_SS, amqpProperties.getUserId().getArray());
-         }
-         if (amqpProperties.getSubject() != null)
-         {
-            properties.putSimpleStringProperty(SUBJECT_SS, new SimpleString(amqpProperties.getSubject()));
-         }
-         if (amqpProperties.getReplyTo() != null)
-         {
-            properties.putSimpleStringProperty(REPLY_TO_SS, new SimpleString(amqpProperties.getReplyTo()));
-         }
-         if (amqpProperties.getCorrelationId() != null)
-         {
-            properties.putSimpleStringProperty(CORRELATION_ID_SS, new SimpleString(amqpProperties.getCorrelationId().toString()));
-         }
-         if (amqpProperties.getContentType() != null)
-         {
-            properties.putSimpleStringProperty(CONTENT_TYPE_SS, new SimpleString(amqpProperties.getContentType().toString()));
-         }
-         if (amqpProperties.getContentEncoding() != null)
-         {
-            properties.putSimpleStringProperty(CONTENT_ENCODING_SS, new SimpleString(amqpProperties.getContentEncoding().toString()));
-         }
-         if (amqpProperties.getAbsoluteExpiryTime() != null)
-         {
-            properties.putLongProperty(ABSOLUTE_EXPIRY_TIME_SS, amqpProperties.getAbsoluteExpiryTime().getTime());
-         }
-         if (amqpProperties.getCreationTime() != null)
-         {
-            properties.putLongProperty(CREATION_TIME_SS, amqpProperties.getCreationTime().getTime());
-         }
-         if (amqpProperties.getGroupId() != null)
-         {
-            properties.putSimpleStringProperty(GROUP_ID_SS, new SimpleString(amqpProperties.getGroupId()));
-         }
-         if (amqpProperties.getGroupSequence() != null)
-         {
-            properties.putIntProperty(GROUP_SEQUENCE_SS, amqpProperties.getGroupSequence().intValue());
-         }
-         if (amqpProperties.getReplyToGroupId() != null)
-         {
-            message.getProperties().putSimpleStringProperty(REPLY_TO_GROUP_ID_SS, new SimpleString(amqpProperties.getReplyToGroupId()));
-         }
-      }
-
-      private static void populateApplicationProperties(ApplicationProperties applicationProperties, TypedProperties properties)
-      {
-         if (applicationProperties != null)
-         {
-            Map props = applicationProperties.getValue();
-            for (Object key : props.keySet())
-            {
-               Object val = props.get(key);
-               setProperty(key, val, properties);
-            }
-         }
-      }
-
-      private static void setProperty(Object key, Object val, TypedProperties properties)
-      {
-         if (val instanceof String)
-         {
-            properties.putSimpleStringProperty(new SimpleString((String) key), new SimpleString((String) val));
-         }
-         else if (val instanceof Boolean)
-         {
-            properties.putBooleanProperty(new SimpleString((String) key), (Boolean) val);
-         }
-         else if (val instanceof Double)
-         {
-            properties.putDoubleProperty(new SimpleString((String) key), (Double) val);
-         }
-         else if (val instanceof Float)
-         {
-            properties.putFloatProperty(new SimpleString((String) key), (Float) val);
-         }
-         else if (val instanceof Integer)
-         {
-            properties.putIntProperty(new SimpleString((String) key), (Integer) val);
-         }
-         else if (val instanceof Byte)
-         {
-            properties.putByteProperty(new SimpleString((String) key), (Byte) val);
-         }
-      }
-
-      public static void populateMessageAnnotations(MessageAnnotations messageAnnotations, TypedProperties properties)
-      {
-         if (messageAnnotations != null)
-         {
-            Map values = messageAnnotations.getValue();
-            Set keySet = values.keySet();
-            for (Object key : keySet)
-            {
-               Symbol symbol = (Symbol) key;
-               Object value = values.get(key);
-               properties.putSimpleStringProperty(new SimpleString(MESSAGE_ANNOTATIONS + symbol.toString()), new SimpleString(value.toString()));
-            }
-         }
-      }
-
-   }
-
-   public static class OUTBOUND
-   {
-      public static EncodedMessage transform(ServerMessage message, int deliveryCount)
-      {
-         long messageFormat = message.getLongProperty(MESSAGE_FORMAT);
-         Integer size = message.getIntProperty(PROTON_MESSAGE_SIZE_SS);
-
-         Header header = populateHeader(message, deliveryCount);
-         DeliveryAnnotations deliveryAnnotations = populateDeliveryAnnotations(message);
-         MessageAnnotations messageAnnotations = populateMessageAnnotations(message);
-         Properties props = populateProperties(message);
-         ApplicationProperties applicationProperties = populateApplicationProperties(message);
-         Section section = populateBody(message);
-         Footer footer = populateFooter(message);
-         Set<SimpleString> propertyNames = message.getPropertyNames();
-         for (SimpleString propertyName : propertyNames)
-         {
-            TypedProperties typedProperties = message.getTypedProperties();
-            String realName = propertyName.toString();
-            if (realName.startsWith(MESSAGE_ANNOTATIONS))
-            {
-
-               SimpleString value = (SimpleString) typedProperties.getProperty(propertyName);
-               Symbol symbol = Symbol.getSymbol(realName.replace(MESSAGE_ANNOTATIONS, ""));
-               messageAnnotations.getValue().put(symbol, value.toString());
-            }
-         }
-         MessageImpl protonMessage = new MessageImpl(header, deliveryAnnotations, messageAnnotations, props, applicationProperties, section, footer);
-         protonMessage.setMessageFormat(getMessageFormat(message.getLongProperty(new SimpleString(PROTON_MESSAGE_FORMAT))));
-         ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
-         final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
-         int c = protonMessage.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-         if (overflow.position() > 0)
-         {
-            buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
-            c = protonMessage.encode(new WritableBuffer.ByteBufferWrapper(buffer));
-         }
-
-         return new EncodedMessage(messageFormat, buffer.array(), 0, c);
-      }
-
-      private static Header populateHeader(ServerMessage message, int deliveryCount)
-      {
-         Header header = new Header();
-         header.setDurable(message.isDurable());
-         header.setPriority(new UnsignedByte(message.getPriority()));
-         header.setDeliveryCount(new UnsignedInteger(deliveryCount));
-         header.setTtl(new UnsignedInteger((int) message.getExpiration()));
-         return header;
-      }
-
-      private static DeliveryAnnotations populateDeliveryAnnotations(ServerMessage message)
-      {
-         HashMap actualValues = new HashMap();
-         DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(actualValues);
-         for (SimpleString name : message.getPropertyNames())
-         {
-            String sName = name.toString();
-            if (sName.startsWith(DELIVERY_ANNOTATIONS))
-            {
-               Object val = message.getTypedProperties().getProperty(name);
-               if (val instanceof SimpleString)
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(DELIVERY_ANNOTATIONS), sName.length()), val.toString());
-               }
-               else
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(DELIVERY_ANNOTATIONS), sName.length()), val);
-               }
-            }
-         }
-         //this is a proton jms thing, if not null it creates wrong type of message
-         return actualValues.size() > 0 ? deliveryAnnotations : null;
-      }
-
-      private static MessageAnnotations populateMessageAnnotations(ServerMessage message)
-      {
-         HashMap actualValues = new HashMap();
-         MessageAnnotations messageAnnotations = new MessageAnnotations(actualValues);
-         for (SimpleString name : message.getPropertyNames())
-         {
-            String sName = name.toString();
-            if (sName.startsWith(MESSAGE_ANNOTATIONS))
-            {
-               Object val = message.getTypedProperties().getProperty(name);
-               if (val instanceof SimpleString)
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(MESSAGE_ANNOTATIONS), sName.length()), val.toString());
-               }
-               else
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(MESSAGE_ANNOTATIONS), sName.length()), val);
-               }
-            }
-         }
-         return messageAnnotations;
-      }
-
-      private static Properties populateProperties(ServerMessage message)
-      {
-         Calendar calendar = Calendar.getInstance();
-         Properties properties = new Properties();
-         TypedProperties typedProperties = message.getTypedProperties();
-         properties.setMessageId(message.getMessageID());
-         if (message.getAddress() != null)
-         {
-            properties.setTo(message.getAddress().toString());
-         }
-         if (typedProperties.containsProperty(USER_ID_SS))
-         {
-            properties.setUserId(new Binary(typedProperties.getBytesProperty(USER_ID_SS)));
-         }
-         if (typedProperties.containsProperty(SUBJECT_SS))
-         {
-            properties.setSubject(typedProperties.getSimpleStringProperty(SUBJECT_SS).toString());
-         }
-         if (typedProperties.containsProperty(REPLY_TO_SS))
-         {
-            properties.setReplyTo(typedProperties.getSimpleStringProperty(REPLY_TO_SS).toString());
-         }
-         if (typedProperties.containsProperty(CORRELATION_ID_SS))
-         {
-            properties.setCorrelationId(typedProperties.getSimpleStringProperty(CORRELATION_ID_SS).toString());
-         }
-         if (typedProperties.containsProperty(CONTENT_TYPE_SS))
-         {
-            properties.setContentType(Symbol.getSymbol(typedProperties.getSimpleStringProperty(CONTENT_TYPE_SS).toString()));
-         }
-         if (typedProperties.containsProperty(CONTENT_ENCODING_SS))
-         {
-            properties.setContentEncoding(Symbol.getSymbol(typedProperties.getSimpleStringProperty(CONTENT_ENCODING_SS).toString()));
-         }
-         if (typedProperties.containsProperty(ABSOLUTE_EXPIRY_TIME_SS))
-         {
-            calendar.setTimeInMillis(typedProperties.getLongProperty(ABSOLUTE_EXPIRY_TIME_SS));
-            properties.setAbsoluteExpiryTime(calendar.getTime());
-         }
-         if (typedProperties.containsProperty(CREATION_TIME_SS))
-         {
-            calendar.setTimeInMillis(typedProperties.getLongProperty(CREATION_TIME_SS));
-            properties.setCreationTime(calendar.getTime());
-         }
-         if (typedProperties.containsProperty(GROUP_ID_SS))
-         {
-            properties.setGroupId(typedProperties.getSimpleStringProperty(GROUP_ID_SS).toString());
-         }
-         if (typedProperties.containsProperty(GROUP_SEQUENCE_SS))
-         {
-            properties.setGroupSequence(new UnsignedInteger(typedProperties.getIntProperty(GROUP_SEQUENCE_SS)));
-         }
-         if (typedProperties.containsProperty(REPLY_TO_GROUP_ID_SS))
-         {
-            properties.setReplyToGroupId(typedProperties.getSimpleStringProperty(REPLY_TO_GROUP_ID_SS).toString());
-         }
-         return properties;
-      }
-
-      private static ApplicationProperties populateApplicationProperties(ServerMessage message)
-      {
-         HashMap<String, Object> values = new HashMap<String, Object>();
-         for (SimpleString name : message.getPropertyNames())
-         {
-            setProperty(name, message.getTypedProperties().getProperty(name), values);
-         }
-         return new ApplicationProperties(values);
-      }
-
-      private static void setProperty(SimpleString name, Object property, HashMap<String, Object> values)
-      {
-         String s = name.toString();
-         if (SPECIAL_PROPS.contains(s) ||
-            s.startsWith(MESSAGE_ANNOTATIONS) ||
-            s.startsWith(DELIVERY_ANNOTATIONS) ||
-            s.startsWith(FOOTER_VALUES))
-         {
-            return;
-         }
-         if (property instanceof SimpleString)
-         {
-            values.put(s, property.toString());
-         }
-         else
-         {
-            values.put(s, property);
-         }
-      }
-
-      private static Footer populateFooter(ServerMessage message)
-      {
-         HashMap actualValues = new HashMap();
-         Footer footer = new Footer(actualValues);
-         for (SimpleString name : message.getPropertyNames())
-         {
-            String sName = name.toString();
-            if (sName.startsWith(FOOTER_VALUES))
-            {
-               Object val = message.getTypedProperties().getProperty(name);
-               if (val instanceof SimpleString)
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(FOOTER_VALUES), sName.length()), val.toString());
-               }
-               else
-               {
-                  actualValues.put(sName.subSequence(sName.indexOf(FOOTER_VALUES), sName.length()), val);
-               }
-            }
-         }
-         return footer;
-      }
-
-      private static Section populateBody(ServerMessage message)
-      {
-         // TODO: Depend on array() is most likely not a very good idea
-         Integer type = message.getIntProperty(MESSAGE_TYPE);
-         switch (type)
-         {
-            case 0:
-            case 1:
-               return new Data(new Binary(message.getBodyBuffer().copy().byteBuf().array()));
-            case 2:
-               return new AmqpValue(new Binary(message.getBodyBuffer().copy().byteBuf().array()));
-            case 3:
-               return new AmqpValue(message.getBodyBuffer().copy().readNullableString());
-            default:
-               return new Data(new Binary(message.getBodyBuffer().copy().byteBuf().array()));
-         }
-      }
-   }
-
-   private static long getMessageFormat(MessageFormat messageFormat)
-   {
-      switch (messageFormat)
-      {
-         case AMQP:
-            return 0;
-         case DATA:
-            return 1;
-         case JSON:
-            return 2;
-         case TEXT:
-            return 3;
-         default:
-            return 0;
-
-      }
-   }
-
-   private static MessageFormat getMessageFormat(long messageFormat)
-   {
-      switch ((int) messageFormat)
-      {
-         case 0:
-            return MessageFormat.AMQP;
-         case 1:
-            return MessageFormat.DATA;
-         case 2:
-            return MessageFormat.JSON;
-         case 3:
-            return MessageFormat.TEXT;
-         default:
-            return MessageFormat.AMQP;
-
-      }
-   }
-
-   private static int getMessageType(Message protonMessage)
-   {
-      Section section = protonMessage.getBody();
-      if (section instanceof AmqpValue)
-      {
-         AmqpValue amqpValue = (AmqpValue) section;
-         Object value = amqpValue.getValue();
-         if (value instanceof String)
-         {
-            return TEXT_TYPE;
-         }
-         else if (value instanceof byte[])
-         {
-            return org.hornetq.api.core.Message.BYTES_TYPE;
-         }
-         else if (value instanceof Map)
-         {
-            return org.hornetq.api.core.Message.MAP_TYPE;
-         }
-         else if (value instanceof Object)
-         {
-            return org.hornetq.api.core.Message.OBJECT_TYPE;
-         }
-         else
-         {
-            return org.hornetq.api.core.Message.DEFAULT_TYPE;
-         }
-      }
-      else
-      {
-         return org.hornetq.api.core.Message.DEFAULT_TYPE;
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java
deleted file mode 100644
index 32d5a7a..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.proton;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.transaction.Coordinator;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Receiver;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException;
-
-/**
- * handles an amqp Coordinator to deal with transaction boundaries etc
- */
-public class TransactionHandler implements ProtonDeliveryHandler
-{
-   private final ProtonRemotingConnection connection;
-   private final Coordinator coordinator;
-   private final ProtonProtocolManager protonProtocolManager;
-   private final ProtonSession protonSession;
-   private final HornetQBuffer buffer;
-
-   public TransactionHandler(ProtonRemotingConnection connection, Coordinator coordinator, ProtonProtocolManager protonProtocolManager, ProtonSession protonSession)
-   {
-      this.connection = connection;
-      this.coordinator = coordinator;
-      this.protonProtocolManager = protonProtocolManager;
-      this.protonSession = protonSession;
-      buffer = connection.createBuffer(1024);
-   }
-
-   @Override
-   public void onMessage(Delivery delivery) throws HornetQAMQPException
-   {
-      Receiver receiver = null;
-      try
-      {
-         receiver = ((Receiver) delivery.getLink());
-
-         if (!delivery.isReadable())
-         {
-            return;
-         }
-
-         protonProtocolManager.handleTransaction(receiver, buffer, delivery, protonSession);
-
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-         condition.setCondition(Symbol.valueOf("failed"));
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
-         delivery.disposition(rejected);
-      }
-   }
-
-   @Override
-   public void checkState()
-   {
-      //noop
-   }
-
-   @Override
-   public void close() throws HornetQAMQPException
-   {
-      //noop
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java
new file mode 100644
index 0000000..3126b28
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.proton.converter;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.proton.jms.JMSVendor;
+import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMapMessage;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSStreamMessage;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSTextMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.IDGenerator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQJMSVendor extends JMSVendor
+{
+
+   private final IDGenerator serverGenerator;
+
+   HornetQJMSVendor(IDGenerator idGenerator)
+   {
+      this.serverGenerator = idGenerator;
+   }
+
+   @Override
+   public BytesMessage createBytesMessage()
+   {
+      return new ServerJMSBytesMessage(newMessage(org.hornetq.api.core.Message.BYTES_TYPE), 0);
+   }
+
+   @Override
+   public StreamMessage createStreamMessage()
+   {
+      return new ServerJMSStreamMessage(newMessage(org.hornetq.api.core.Message.STREAM_TYPE), 0);
+   }
+
+   @Override
+   public Message createMessage()
+   {
+      return new ServerJMSMessage(newMessage(org.hornetq.api.core.Message.DEFAULT_TYPE), 0 );
+   }
+
+   @Override
+   public TextMessage createTextMessage()
+   {
+      return new ServerJMSTextMessage(newMessage(org.hornetq.api.core.Message.TEXT_TYPE), 0);
+   }
+
+   @Override
+   public ObjectMessage createObjectMessage()
+   {
+      return null;
+   }
+
+   @Override
+   public MapMessage createMapMessage()
+   {
+      return new ServerJMSMapMessage(newMessage(org.hornetq.api.core.Message.MAP_TYPE), 0);
+   }
+
+   @Override
+   public void setJMSXUserID(Message message, String s)
+   {
+   }
+
+   @Override
+   public Destination createDestination(String name)
+   {
+      return super.createDestination(name);
+   }
+
+   @Override
+   public <T extends Destination> T createDestination(String name, Class<T> kind)
+   {
+      return super.createDestination(name, kind);
+   }
+
+   @Override
+   public void setJMSXGroupID(Message message, String s)
+   {
+
+   }
+
+   @Override
+   public void setJMSXGroupSequence(Message message, int i)
+   {
+
+   }
+
+   @Override
+   public void setJMSXDeliveryCount(Message message, long l)
+   {
+
+   }
+
+
+   public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount)
+   {
+      switch (messageType)
+      {
+         case org.hornetq.api.core.Message.STREAM_TYPE:
+            return new ServerJMSStreamMessage(wrapped, deliveryCount);
+         case org.hornetq.api.core.Message.BYTES_TYPE:
+            return new ServerJMSBytesMessage(wrapped, deliveryCount);
+         case org.hornetq.api.core.Message.MAP_TYPE:
+            return new ServerJMSMapMessage(wrapped, deliveryCount);
+         case org.hornetq.api.core.Message.TEXT_TYPE:
+            return new ServerJMSTextMessage(wrapped, deliveryCount);
+         default:
+            return new ServerJMSMessage(wrapped, deliveryCount);
+      }
+
+   }
+
+
+   @Override
+   public String toAddress(Destination destination)
+   {
+      return null;
+   }
+
+
+   private ServerMessageImpl newMessage(byte messageType)
+   {
+      ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedHornetQBuffer)message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java
new file mode 100644
index 0000000..495c76a
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.proton.converter;
+
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.InboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer;
+import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.MessageConverter;
+import org.hornetq.utils.IDGenerator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonMessageConverter implements MessageConverter
+{
+
+
+   HornetQJMSVendor hornetQJMSVendor;
+
+   public ProtonMessageConverter(IDGenerator idGenerator)
+   {
+      hornetQJMSVendor = new HornetQJMSVendor(idGenerator);
+      inboundTransformer = new JMSMappingInboundTransformer(hornetQJMSVendor);
+      outboundTransformer = new JMSMappingOutboundTransformer(hornetQJMSVendor);
+   }
+
+   private final InboundTransformer inboundTransformer;
+   private final JMSMappingOutboundTransformer outboundTransformer;
+
+   @Override
+   public ServerMessage inbound(Object messageSource) throws Exception
+   {
+      ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource);
+
+      return (ServerMessage)jmsMessage.getInnerMessage();
+   }
+
+   /**
+    * Just create the JMS Part of the inbound (for testing)
+    * @param messageSource
+    * @return
+    * @throws Exception
+    */
+   public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception
+   {
+      EncodedMessage encodedMessageSource = messageSource;
+      ServerJMSMessage transformedMessage = (ServerJMSMessage)inboundTransformer.transform(encodedMessageSource);
+
+      transformedMessage.encode();
+
+      return transformedMessage;
+   }
+
+
+   @Override
+   public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception
+   {
+      ServerJMSMessage jmsMessage = hornetQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+      jmsMessage.decode();
+
+      return outboundTransformer.convert(jmsMessage);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
new file mode 100644
index 0000000..840fb35
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.proton.converter.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.message.impl.MessageInternal;
+
+import static org.hornetq.reader.BytesMessageUtil.bytesMessageReset;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadBytes;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadChar;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadDouble;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadFloat;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadInt;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadLong;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUTF;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteByte;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteChar;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteInt;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteLong;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteObject;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteShort;
+import static org.hornetq.reader.BytesMessageUtil.bytesWriteUTF;
+
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage
+{
+   public ServerJMSBytesMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+   }
+
+   @Override
+   public long getBodyLength() throws JMSException
+   {
+      return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+   }
+
+   @Override
+   public boolean readBoolean() throws JMSException
+   {
+      return bytesReadBoolean(message);
+   }
+
+   @Override
+   public byte readByte() throws JMSException
+   {
+      return bytesReadByte(message);
+   }
+
+   @Override
+   public int readUnsignedByte() throws JMSException
+   {
+      return bytesReadUnsignedByte(message);
+   }
+
+   @Override
+   public short readShort() throws JMSException
+   {
+      return bytesReadShort(message);
+   }
+
+   @Override
+   public int readUnsignedShort() throws JMSException
+   {
+      return bytesReadUnsignedShort(message);
+   }
+
+   @Override
+   public char readChar() throws JMSException
+   {
+      return bytesReadChar(message);
+   }
+
+   @Override
+   public int readInt() throws JMSException
+   {
+      return bytesReadInt(message);
+   }
+
+   @Override
+   public long readLong() throws JMSException
+   {
+      return bytesReadLong(message);
+   }
+
+   @Override
+   public float readFloat() throws JMSException
+   {
+      return bytesReadFloat(message);
+   }
+
+   @Override
+   public double readDouble() throws JMSException
+   {
+      return bytesReadDouble(message);
+   }
+
+   @Override
+   public String readUTF() throws JMSException
+   {
+      return bytesReadUTF(message);
+   }
+
+   @Override
+   public int readBytes(byte[] value) throws JMSException
+   {
+      return bytesReadBytes(message, value);
+   }
+
+   @Override
+   public int readBytes(byte[] value, int length) throws JMSException
+   {
+      return bytesReadBytes(message, value, length);
+   }
+
+   @Override
+   public void writeBoolean(boolean value) throws JMSException
+   {
+      bytesWriteBoolean(message, value);
+
+   }
+
+   @Override
+   public void writeByte(byte value) throws JMSException
+   {
+      bytesWriteByte(message, value);
+   }
+
+   @Override
+   public void writeShort(short value) throws JMSException
+   {
+      bytesWriteShort(message, value);
+   }
+
+   @Override
+   public void writeChar(char value) throws JMSException
+   {
+      bytesWriteChar(message, value);
+   }
+
+   @Override
+   public void writeInt(int value) throws JMSException
+   {
+      bytesWriteInt(message, value);
+   }
+
+   @Override
+   public void writeLong(long value) throws JMSException
+   {
+      bytesWriteLong(message, value);
+   }
+
+   @Override
+   public void writeFloat(float value) throws JMSException
+   {
+      bytesWriteFloat(message, value);
+   }
+
+   @Override
+   public void writeDouble(double value) throws JMSException
+   {
+      bytesWriteDouble(message, value);
+   }
+
+   @Override
+   public void writeUTF(String value) throws JMSException
+   {
+      bytesWriteUTF(message, value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value) throws JMSException
+   {
+      bytesWriteBytes(message, value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value, int offset, int length) throws JMSException
+   {
+      bytesWriteBytes(message, value, offset, length);
+   }
+
+   @Override
+   public void writeObject(Object value) throws JMSException
+   {
+      if (!bytesWriteObject(message, value))
+      {
+         throw new JMSException("Can't make conversion of " + value + " to any known type");
+      }
+   }
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      // this is to make sure we encode the body-length before it's persisted
+      getBodyLength();
+   }
+
+
+   public void decode() throws Exception
+   {
+      super.decode();
+
+   }
+
+   @Override
+   public void reset() throws JMSException
+   {
+      bytesMessageReset(message);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
new file mode 100644
index 0000000..4373098
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.HornetQPropertyConversionException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.utils.TypedProperties;
+
+import static org.hornetq.reader.MapMessageUtil.readBodyMap;
+import static org.hornetq.reader.MapMessageUtil.writeBodyMap;
+
+/**
+ * HornetQ implementation of a JMS MapMessage.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version $Revision: 3412 $
+ */
+public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.MAP_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   private final TypedProperties map = new TypedProperties();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSMapMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+
+   // MapMessage implementation -------------------------------------
+
+   public void setBoolean(final String name, final boolean value) throws JMSException
+   {
+      map.putBooleanProperty(new SimpleString(name), value);
+   }
+
+   public void setByte(final String name, final byte value) throws JMSException
+   {
+      map.putByteProperty(new SimpleString(name), value);
+   }
+
+   public void setShort(final String name, final short value) throws JMSException
+   {
+      map.putShortProperty(new SimpleString(name), value);
+   }
+
+   public void setChar(final String name, final char value) throws JMSException
+   {
+      map.putCharProperty(new SimpleString(name), value);
+   }
+
+   public void setInt(final String name, final int value) throws JMSException
+   {
+      map.putIntProperty(new SimpleString(name), value);
+   }
+
+   public void setLong(final String name, final long value) throws JMSException
+   {
+      map.putLongProperty(new SimpleString(name), value);
+   }
+
+   public void setFloat(final String name, final float value) throws JMSException
+   {
+      map.putFloatProperty(new SimpleString(name), value);
+   }
+
+   public void setDouble(final String name, final double value) throws JMSException
+   {
+      map.putDoubleProperty(new SimpleString(name), value);
+   }
+
+   public void setString(final String name, final String value) throws JMSException
+   {
+      map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value));
+   }
+
+   public void setBytes(final String name, final byte[] value) throws JMSException
+   {
+      map.putBytesProperty(new SimpleString(name), value);
+   }
+
+   public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException
+   {
+      if (offset + length > value.length)
+      {
+         throw new JMSException("Invalid offset/length");
+      }
+      byte[] newBytes = new byte[length];
+      System.arraycopy(value, offset, newBytes, 0, length);
+      map.putBytesProperty(new SimpleString(name), newBytes);
+   }
+
+   public void setObject(final String name, final Object value) throws JMSException
+   {
+      try
+      {
+         TypedProperties.setObjectProperty(new SimpleString(name), value, map);
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public boolean getBoolean(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getBooleanProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public byte getByte(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getByteProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public short getShort(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getShortProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public char getChar(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getCharProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public int getInt(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getIntProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public long getLong(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getLongProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public float getFloat(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getFloatProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public double getDouble(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getDoubleProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public String getString(final String name) throws JMSException
+   {
+      try
+      {
+         SimpleString str = map.getSimpleStringProperty(new SimpleString(name));
+         if (str == null)
+         {
+            return null;
+         }
+         else
+         {
+            return str.toString();
+         }
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public byte[] getBytes(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getBytesProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public Object getObject(final String name) throws JMSException
+   {
+      Object val = map.getProperty(new SimpleString(name));
+
+      if (val instanceof SimpleString)
+      {
+         val = ((SimpleString) val).toString();
+      }
+
+      return val;
+   }
+
+   public Enumeration getMapNames() throws JMSException
+   {
+      Set<SimpleString> simplePropNames = map.getPropertyNames();
+      Set<String> propNames = new HashSet<String>(simplePropNames.size());
+
+      for (SimpleString str : simplePropNames)
+      {
+         propNames.add(str.toString());
+      }
+
+      return Collections.enumeration(propNames);
+   }
+
+   public boolean itemExists(final String name) throws JMSException
+   {
+      return map.containsProperty(new SimpleString(name));
+   }
+
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      map.clear();
+   }
+
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      writeBodyMap(message, map);
+   }
+
+   public void decode() throws Exception
+   {
+      super.decode();
+      readBodyMap(message, map);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java
new file mode 100644
index 0000000..2c5e6e2
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.proton.converter.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.reader.MessageUtil;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ServerJMSMessage implements Message
+{
+   protected final MessageInternal message;
+
+   protected int deliveryCount;
+
+   public MessageInternal getInnerMessage()
+   {
+      return message;
+   }
+
+
+   public ServerJMSMessage(MessageInternal message, int deliveryCount)
+   {
+      this.message = message;
+      this.deliveryCount = deliveryCount;
+   }
+
+
+   @Override
+   public final String getJMSMessageID() throws JMSException
+   {
+      return null;
+   }
+
+   @Override
+   public final void setJMSMessageID(String id) throws JMSException
+   {
+   }
+
+   @Override
+   public final long getJMSTimestamp() throws JMSException
+   {
+      return message.getTimestamp();
+   }
+
+   @Override
+   public final void setJMSTimestamp(long timestamp) throws JMSException
+   {
+      message.setTimestamp(timestamp);
+   }
+
+
+   @Override
+   public final byte[] getJMSCorrelationIDAsBytes() throws JMSException
+   {
+      return MessageUtil.getJMSCorrelationIDAsBytes(message);
+   }
+
+   @Override
+   public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
+   {
+      try
+      {
+         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+      }
+      catch (HornetQException e)
+      {
+         throw new JMSException(e.getMessage());
+      }
+   }
+
+   @Override
+   public final void setJMSCorrelationID(String correlationID) throws JMSException
+   {
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+   }
+
+   @Override
+   public final String getJMSCorrelationID() throws JMSException
+   {
+      return MessageUtil.getJMSCorrelationID(message);
+   }
+
+   @Override
+   public final Destination getJMSReplyTo() throws JMSException
+   {
+      SimpleString reply = MessageUtil.getJMSReplyTo(message);
+      if (reply != null)
+      {
+         return HornetQDestination.fromAddress(reply.toString());
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public final void setJMSReplyTo(Destination replyTo) throws JMSException
+   {
+      MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((HornetQDestination) replyTo).getSimpleAddress());
+
+   }
+
+   public final Destination getJMSDestination() throws JMSException
+   {
+      SimpleString sdest = message.getAddress();
+
+      if (sdest == null)
+      {
+         return null;
+      }
+      else
+      {
+         if (!sdest.toString().startsWith("jms."))
+         {
+            return new HornetQQueue(sdest.toString(), sdest.toString());
+         }
+         else
+         {
+            return HornetQDestination.fromAddress(sdest.toString());
+         }
+      }
+   }
+
+   @Override
+   public final void setJMSDestination(Destination destination) throws JMSException
+   {
+      if (destination == null)
+      {
+         message.setAddress(null);
+      }
+      else
+      {
+         message.setAddress(((HornetQDestination) destination).getSimpleAddress());
+      }
+
+   }
+
+   @Override
+   public final int getJMSDeliveryMode() throws JMSException
+   {
+      return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+   }
+
+   @Override
+   public final void setJMSDeliveryMode(int deliveryMode) throws JMSException
+   {
+      if (deliveryMode == DeliveryMode.PERSISTENT)
+      {
+         message.setDurable(true);
+      }
+      else if (deliveryMode == DeliveryMode.NON_PERSISTENT)
+      {
+         message.setDurable(false);
+      }
+      else
+      {
+         throw new JMSException("Invalid mode " + deliveryMode);
+      }
+   }
+
+   @Override
+   public final boolean getJMSRedelivered() throws JMSException
+   {
+      return false;
+   }
+
+   @Override
+   public final void setJMSRedelivered(boolean redelivered) throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public final String getJMSType() throws JMSException
+   {
+      return MessageUtil.getJMSType(message);
+   }
+
+   @Override
+   public final void setJMSType(String type) throws JMSException
+   {
+      MessageUtil.setJMSType(message, type);
+   }
+
+   @Override
+   public final long getJMSExpiration() throws JMSException
+   {
+      return message.getExpiration();
+   }
+
+   @Override
+   public final void setJMSExpiration(long expiration) throws JMSException
+   {
+      message.setExpiration(expiration);
+   }
+
+   @Override
+   public final long getJMSDeliveryTime() throws JMSException
+   {
+      // no op
+      return 0;
+   }
+
+   @Override
+   public final void setJMSDeliveryTime(long deliveryTime) throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public final int getJMSPriority() throws JMSException
+   {
+      return message.getPriority();
+   }
+
+   @Override
+   public final void setJMSPriority(int priority) throws JMSException
+   {
+      message.setPriority((byte) priority);
+   }
+
+   @Override
+   public final void clearProperties() throws JMSException
+   {
+      MessageUtil.clearProperties(message);
+
+   }
+
+   @Override
+   public final boolean propertyExists(String name) throws JMSException
+   {
+      return MessageUtil.propertyExists(message, name);
+   }
+
+   @Override
+   public final boolean getBooleanProperty(String name) throws JMSException
+   {
+      return message.getBooleanProperty(name);
+   }
+
+   @Override
+   public final byte getByteProperty(String name) throws JMSException
+   {
+      return message.getByteProperty(name);
+   }
+
+   @Override
+   public final short getShortProperty(String name) throws JMSException
+   {
+      return message.getShortProperty(name);
+   }
+
+   @Override
+   public final int getIntProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return deliveryCount;
+      }
+
+      return message.getIntProperty(name);
+   }
+
+   @Override
+   public final long getLongProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return deliveryCount;
+      }
+
+      return message.getLongProperty(name);
+   }
+
+   @Override
+   public final float getFloatProperty(String name) throws JMSException
+   {
+      return message.getFloatProperty(name);
+   }
+
+   @Override
+   public final double getDoubleProperty(String name) throws JMSException
+   {
+      return message.getDoubleProperty(name);
+   }
+
+   @Override
+   public final String getStringProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return String.valueOf(deliveryCount);
+      }
+
+
+      return message.getStringProperty(name);
+   }
+
+   @Override
+   public final Object getObjectProperty(String name) throws JMSException
+   {
+      Object val = message.getObjectProperty(name);
+      if (val instanceof SimpleString)
+      {
+         val = ((SimpleString)val).toString();
+      }
+      return val;
+   }
+
+   @Override
+   public final Enumeration getPropertyNames() throws JMSException
+   {
+      return Collections.enumeration(MessageUtil.getPropertyNames(message));
+   }
+
+   @Override
+   public final void setBooleanProperty(String name, boolean value) throws JMSException
+   {
+      message.putBooleanProperty(name, value);
+   }
+
+   @Override
+   public final void setByteProperty(String name, byte value) throws JMSException
+   {
+      message.putByteProperty(name, value);
+   }
+
+   @Override
+   public final void setShortProperty(String name, short value) throws JMSException
+   {
+      message.putShortProperty(name, value);
+   }
+
+   @Override
+   public final void setIntProperty(String name, int value) throws JMSException
+   {
+      message.putIntProperty(name, value);
+   }
+
+   @Override
+   public final void setLongProperty(String name, long value) throws JMSException
+   {
+      message.putLongProperty(name, value);
+   }
+
+   @Override
+   public final void setFloatProperty(String name, float value) throws JMSException
+   {
+      message.putFloatProperty(name, value);
+   }
+
+   @Override
+   public final void setDoubleProperty(String name, double value) throws JMSException
+   {
+      message.putDoubleProperty(name, value);
+   }
+
+   @Override
+   public final void setStringProperty(String name, String value) throws JMSException
+   {
+      message.putStringProperty(name, value);
+   }
+
+   @Override
+   public final void setObjectProperty(String name, Object value) throws JMSException
+   {
+      message.putObjectProperty(name, value);
+   }
+
+   @Override
+   public final void acknowledge() throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      message.getBodyBuffer().clear();
+   }
+
+   @Override
+   public final <T> T getBody(Class<T> c) throws JMSException
+   {
+      // no op.. jms2 not used on the conversion
+      return null;
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   public void encode() throws Exception
+   {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+
+   public void decode() throws Exception
+   {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+   @Override
+   public final boolean isBodyAssignableTo(Class c) throws JMSException
+   {
+      // no op.. jms2 not used on the conversion
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
new file mode 100644
index 0000000..14a5954
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
+import org.hornetq.core.message.impl.MessageInternal;
+import org.hornetq.utils.DataConstants;
+
+import static org.hornetq.reader.MessageUtil.getBodyBuffer;
+import static org.hornetq.reader.StreamMessageUtil.streamReadBoolean;
+import static org.hornetq.reader.StreamMessageUtil.streamReadByte;
+import static org.hornetq.reader.StreamMessageUtil.streamReadBytes;
+import static org.hornetq.reader.StreamMessageUtil.streamReadChar;
+import static org.hornetq.reader.StreamMessageUtil.streamReadDouble;
+import static org.hornetq.reader.StreamMessageUtil.streamReadFloat;
+import static org.hornetq.reader.StreamMessageUtil.streamReadInteger;
+import static org.hornetq.reader.StreamMessageUtil.streamReadLong;
+import static org.hornetq.reader.StreamMessageUtil.streamReadObject;
+import static org.hornetq.reader.StreamMessageUtil.streamReadShort;
+import static org.hornetq.reader.StreamMessageUtil.streamReadString;
+
+public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage
+{
+   public static final byte TYPE = Message.STREAM_TYPE;
+
+   private int bodyLength = 0;
+
+
+   public ServerJMSStreamMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+
+   // StreamMessage implementation ----------------------------------
+
+   public boolean readBoolean() throws JMSException
+   {
+      try
+      {
+         return streamReadBoolean(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public byte readByte() throws JMSException
+   {
+      try
+      {
+         return streamReadByte(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public short readShort() throws JMSException
+   {
+
+      try
+      {
+         return streamReadShort(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public char readChar() throws JMSException
+   {
+
+      try
+      {
+         return streamReadChar(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readInt() throws JMSException
+   {
+
+      try
+      {
+         return streamReadInteger(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public long readLong() throws JMSException
+   {
+
+      try
+      {
+         return streamReadLong(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public float readFloat() throws JMSException
+   {
+
+      try
+      {
+         return streamReadFloat(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public double readDouble() throws JMSException
+   {
+
+      try
+      {
+         return streamReadDouble(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public String readString() throws JMSException
+   {
+
+      try
+      {
+         return streamReadString(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   /**
+    * len here is used to control how many more bytes to read
+    */
+   private int len = 0;
+
+   public int readBytes(final byte[] value) throws JMSException
+   {
+
+      try
+      {
+         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+
+         len = pairRead.getA();
+         return pairRead.getB();
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public Object readObject() throws JMSException
+   {
+
+      if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition())
+      {
+         throw new MessageEOFException("");
+      }
+      try
+      {
+         return streamReadObject(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BOOLEAN);
+      getBuffer().writeBoolean(value);
+   }
+
+   public void writeByte(final byte value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTE);
+      getBuffer().writeByte(value);
+   }
+
+   public void writeShort(final short value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.SHORT);
+      getBuffer().writeShort(value);
+   }
+
+   public void writeChar(final char value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.CHAR);
+      getBuffer().writeShort((short) value);
+   }
+
+   public void writeInt(final int value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.INT);
+      getBuffer().writeInt(value);
+   }
+
+   public void writeLong(final long value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.LONG);
+      getBuffer().writeLong(value);
+   }
+
+   public void writeFloat(final float value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.FLOAT);
+      getBuffer().writeInt(Float.floatToIntBits(value));
+   }
+
+   public void writeDouble(final double value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.DOUBLE);
+      getBuffer().writeLong(Double.doubleToLongBits(value));
+   }
+
+   public void writeString(final String value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.STRING);
+      getBuffer().writeNullableString(value);
+   }
+
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(value.length);
+      getBuffer().writeBytes(value);
+   }
+
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(length);
+      getBuffer().writeBytes(value, offset, length);
+   }
+
+   public void writeObject(final Object value) throws JMSException
+   {
+      if (value instanceof String)
+      {
+         writeString((String) value);
+      }
+      else if (value instanceof Boolean)
+      {
+         writeBoolean((Boolean) value);
+      }
+      else if (value instanceof Byte)
+      {
+         writeByte((Byte) value);
+      }
+      else if (value instanceof Short)
+      {
+         writeShort((Short) value);
+      }
+      else if (value instanceof Integer)
+      {
+         writeInt((Integer) value);
+      }
+      else if (value instanceof Long)
+      {
+         writeLong((Long) value);
+      }
+      else if (value instanceof Float)
+      {
+         writeFloat((Float) value);
+      }
+      else if (value instanceof Double)
+      {
+         writeDouble((Double) value);
+      }
+      else if (value instanceof byte[])
+      {
+         writeBytes((byte[]) value);
+      }
+      else if (value instanceof Character)
+      {
+         writeChar((Character) value);
+      }
+      else if (value == null)
+      {
+         writeString(null);
+      }
+      else
+      {
+         throw new MessageFormatException("Invalid object type: " + value.getClass());
+      }
+   }
+
+   public void reset() throws JMSException
+   {
+      getBuffer().resetReaderIndex();
+   }
+
+   // HornetQRAMessage overrides ----------------------------------------
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      getBuffer().clear();
+   }
+
+   private HornetQBuffer getBuffer()
+   {
+      return message.getBodyBuffer();
+   }
+
+
+   public void decode() throws Exception
+   {
+      super.decode();
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   public void encode() throws Exception
+   {
+      super.encode();
+      bodyLength = message.getEndOfBodyPosition();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
new file mode 100644
index 0000000..5455e6e
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageInternal;
+
+import static org.hornetq.reader.TextMessageUtil.readBodyText;
+import static org.hornetq.reader.TextMessageUtil.writeBodyText;
+
+
+/**
+ * HornetQ implementation of a JMS TextMessage.
+ * <br>
+ * This class was ported from SpyTextMessage in JBossMQ.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version $Revision: 3412 $
+ */
+public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.TEXT_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write
+   // methods are more efficient for a SimpleString
+   private SimpleString text;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSTextMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+   // TextMessage implementation ------------------------------------
+
+   public void setText(final String text) throws JMSException
+   {
+      if (text != null)
+      {
+         this.text = new SimpleString(text);
+      }
+      else
+      {
+         this.text = null;
+      }
+
+      writeBodyText(message, this.text);
+   }
+
+   public String getText()
+   {
+      if (text != null)
+      {
+         return text.toString();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      text = null;
+   }
+
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      writeBodyText(message, text);
+   }
+
+   public void decode() throws Exception
+   {
+      super.decode();
+      text = readBodyText(message);
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java
new file mode 100644
index 0000000..acc5a0d
--- /dev/null
+++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+/**
+ * This package contains incomplete JMS implementations just to be used with converting amqp to hornetq and
+ * vice versa
+ * @author Clebert Suconic
+ */
+
+package org.hornetq.core.protocol.proton.converter.jms;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java
deleted file mode 100644
index 03f797d..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton.exceptions;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.hornetq.api.core.HornetQException;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         6/6/13
- */
-public class HornetQAMQPException extends HornetQException
-{
-
-   private static final String ERROR_PREFIX = "amqp:";
-
-   public Symbol getAmqpError()
-   {
-      return amqpError;
-   }
-
-   private final Symbol amqpError;
-
-   public HornetQAMQPException(Symbol amqpError, String message)
-   {
-      super(message);
-      this.amqpError = amqpError;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java
deleted file mode 100644
index 0792f88..0000000
--- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2005-2014 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.proton.exceptions;
-
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- *         6/6/13
- */
-public class HornetQAMQPIllegalStateException extends HornetQAMQPException
-{
-   public HornetQAMQPIllegalStateException(String message)
-   {
-      super(AmqpError.ILLEGAL_STATE, message);
-   }
-}


Mime
View raw message