activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [04/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:34 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/pom.xml b/activemq6-protocols/activemq6-amqp-protocol/pom.xml
new file mode 100644
index 0000000..b066415
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/pom.xml
@@ -0,0 +1,76 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <parent>
+      <artifactId>activemq6-protocols</artifactId>
+      <groupId>org.apache.activemq6</groupId>
+      <version>6.0.0-SNAPSHOT</version>
+   </parent>
+   <modelVersion>4.0.0</modelVersion>
+
+   <artifactId>activemq6-amqp-protocol</artifactId>
+
+   <properties>
+      <hornetq.basedir>${project.basedir}/../..</hornetq.basedir>
+   </properties>
+
+   <dependencies>
+      <!-- JMS Client because of some Convertions that are done -->
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-core-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+
+      <!--
+          JBoss Logging
+      -->
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-server</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-proton-plug</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>proton-j</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>proton-jms</artifactId>
+      </dependency>
+
+
+
+      <dependency>
+         <groupId>org.jboss.spec.javax.jms</groupId>
+         <artifactId>jboss-jms-api_2.0_spec</artifactId>
+         <scope>provided</scope>
+      </dependency>
+      <dependency>
+         <groupId>junit</groupId>
+         <artifactId>junit</artifactId>
+         <scope>test</scope>
+      </dependency>
+
+
+   </dependencies>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java
new file mode 100644
index 0000000..e526d52
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton;
+
+import java.util.concurrent.Executor;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.core.client.HornetQClientLogger;
+import org.apache.activemq6.spi.core.protocol.AbstractRemotingConnection;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.proton.plug.AMQPConnectionContext;
+
+/**
+ *
+ * This is a Server's Connection representation used by HornetQ.
+ * @author Clebert Suconic
+ */
+
+public class HornetQProtonRemotingConnection extends AbstractRemotingConnection
+{
+   private final AMQPConnectionContext amqpConnection;
+
+   private boolean destroyed = false;
+
+   private final ProtonProtocolManager manager;
+
+
+   public HornetQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, Executor executor)
+   {
+      super(transportConnection, executor);
+      this.manager = manager;
+      this.amqpConnection = amqpConnection;
+   }
+
+   public ProtonProtocolManager getManager()
+   {
+      return manager;
+   }
+
+   /*
+    * This can be called concurrently by more than one thread so needs to be locked
+    */
+   public void fail(final HornetQException me, String scaleDownTargetNodeID)
+   {
+      if (destroyed)
+      {
+         return;
+      }
+
+      destroyed = true;
+
+      HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+
+      // Then call the listeners
+      callFailureListeners(me, scaleDownTargetNodeID);
+
+      callClosingListeners();
+
+      internalClose();
+   }
+
+
+   @Override
+   public void destroy()
+   {
+      synchronized (this)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         destroyed = true;
+      }
+
+
+      callClosingListeners();
+
+      internalClose();
+
+   }
+
+   @Override
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   @Override
+   public boolean isDestroyed()
+   {
+      return destroyed;
+   }
+
+   @Override
+   public void disconnect(boolean criticalError)
+   {
+      getTransportConnection().close();
+   }
+
+   /**
+    * Disconnect the connection, closing all channels
+    */
+   @Override
+   public void disconnect(String scaleDownNodeID, boolean criticalError)
+   {
+      getTransportConnection().close();
+   }
+
+   @Override
+   public boolean checkDataReceived()
+   {
+      return amqpConnection.checkDataReceived();
+   }
+
+   @Override
+   public void flush()
+   {
+      amqpConnection.flush();
+   }
+
+   @Override
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {
+      amqpConnection.inputBuffer(buffer.byteBuf());
+      super.bufferReceived(connectionID, buffer);
+   }
+
+   private void internalClose()
+   {
+      // We close the underlying transport connection
+      getTransportConnection().close();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java
new file mode 100644
index 0000000..8398866
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton;
+
+import java.util.concurrent.Executor;
+
+import io.netty.channel.ChannelPipeline;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.core.protocol.proton.converter.ProtonMessageConverter;
+import org.apache.activemq6.core.protocol.proton.plug.HornetQProtonConnectionCallback;
+import org.apache.activemq6.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq6.core.server.HornetQServer;
+import org.apache.activemq6.core.server.management.Notification;
+import org.apache.activemq6.core.server.management.NotificationListener;
+import org.apache.activemq6.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq6.spi.core.protocol.MessageConverter;
+import org.apache.activemq6.spi.core.protocol.ProtocolManager;
+import org.apache.activemq6.spi.core.protocol.RemotingConnection;
+import org.apache.activemq6.spi.core.remoting.Acceptor;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.proton.plug.AMQPServerConnectionContext;
+import org.proton.plug.context.server.ProtonServerConnectionContextFactory;
+
+/**
+ * A proton protocol manager, basically reads the Proton Input and maps proton resources to HornetQ resources
+ *
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ProtonProtocolManager implements ProtocolManager, NotificationListener
+{
+   private final HornetQServer server;
+
+   private MessageConverter protonConverter;
+
+   public ProtonProtocolManager(HornetQServer server)
+   {
+      this.server = server;
+      this.protonConverter = new ProtonMessageConverter(server.getStorageManager());
+   }
+
+   public HornetQServer getServer()
+   {
+      return server;
+   }
+
+
+   @Override
+   public MessageConverter getConverter()
+   {
+      return protonConverter;
+   }
+
+   @Override
+   public void onNotification(Notification notification)
+   {
+
+   }
+
+   @Override
+   public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection)
+   {
+      HornetQProtonConnectionCallback connectionCallback = new HornetQProtonConnectionCallback(this, remotingConnection);
+
+      AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback);
+
+      Executor executor = server.getExecutorFactory().getExecutor();
+
+      HornetQProtonRemotingConnection delegate = new HornetQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor);
+
+      connectionCallback.setProtonConnectionDelegate(delegate);
+
+      ConnectionEntry entry = new ConnectionEntry(delegate, executor,
+                                                  System.currentTimeMillis(), HornetQClient.DEFAULT_CONNECTION_TTL);
+
+      return entry;
+   }
+
+   @Override
+   public void removeHandler(String name)
+   {
+
+   }
+
+   @Override
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+   {
+      HornetQProtonRemotingConnection protonConnection = (HornetQProtonRemotingConnection)connection;
+
+      protonConnection.bufferReceived(protonConnection.getID(), buffer);
+   }
+
+   @Override
+   public void addChannelHandlers(ChannelPipeline pipeline)
+   {
+
+   }
+
+   @Override
+   public boolean isProtocol(byte[] array)
+   {
+      return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P';
+   }
+
+   @Override
+   public void handshake(NettyServerConnection connection, HornetQBuffer buffer)
+   {
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java
new file mode 100644
index 0000000..68cc9a5
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton;
+
+import org.apache.activemq6.api.core.Interceptor;
+import org.apache.activemq6.core.server.HornetQServer;
+import org.apache.activemq6.spi.core.protocol.ProtocolManager;
+import org.apache.activemq6.spi.core.protocol.ProtocolManagerFactory;
+
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ProtonProtocolManagerFactory implements ProtocolManagerFactory
+{
+   private static final String AMQP_PROTOCOL_NAME = "AMQP";
+
+   private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
+
+   @Override
+   public ProtocolManager createProtocolManager(HornetQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors)
+   {
+      return new ProtonProtocolManager(server);
+   }
+
+   @Override
+   public String[] getProtocols()
+   {
+      return SUPPORTED_PROTOCOLS;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java
new file mode 100644
index 0000000..725ecfc
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.converter;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.proton.jms.JMSVendor;
+import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq6.core.server.ServerMessage;
+import org.apache.activemq6.core.server.impl.ServerMessageImpl;
+import org.apache.activemq6.utils.IDGenerator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQJMSVendor extends JMSVendor
+{
+
+   private final IDGenerator serverGenerator;
+
+   HornetQJMSVendor(IDGenerator idGenerator)
+   {
+      this.serverGenerator = idGenerator;
+   }
+
+   @Override
+   public BytesMessage createBytesMessage()
+   {
+      return new ServerJMSBytesMessage(newMessage(org.apache.activemq6.api.core.Message.BYTES_TYPE), 0);
+   }
+
+   @Override
+   public StreamMessage createStreamMessage()
+   {
+      return new ServerJMSStreamMessage(newMessage(org.apache.activemq6.api.core.Message.STREAM_TYPE), 0);
+   }
+
+   @Override
+   public Message createMessage()
+   {
+      return new ServerJMSMessage(newMessage(org.apache.activemq6.api.core.Message.DEFAULT_TYPE), 0 );
+   }
+
+   @Override
+   public TextMessage createTextMessage()
+   {
+      return new ServerJMSTextMessage(newMessage(org.apache.activemq6.api.core.Message.TEXT_TYPE), 0);
+   }
+
+   @Override
+   public ObjectMessage createObjectMessage()
+   {
+      return null;
+   }
+
+   @Override
+   public MapMessage createMapMessage()
+   {
+      return new ServerJMSMapMessage(newMessage(org.apache.activemq6.api.core.Message.MAP_TYPE), 0);
+   }
+
+   @Override
+   public void setJMSXUserID(Message message, String s)
+   {
+   }
+
+   @Override
+   public Destination createDestination(String name)
+   {
+      return super.createDestination(name);
+   }
+
+   @Override
+   public <T extends Destination> T createDestination(String name, Class<T> kind)
+   {
+      return super.createDestination(name, kind);
+   }
+
+   @Override
+   public void setJMSXGroupID(Message message, String s)
+   {
+
+   }
+
+   @Override
+   public void setJMSXGroupSequence(Message message, int i)
+   {
+
+   }
+
+   @Override
+   public void setJMSXDeliveryCount(Message message, long l)
+   {
+
+   }
+
+
+   public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount)
+   {
+      switch (messageType)
+      {
+         case org.apache.activemq6.api.core.Message.STREAM_TYPE:
+            return new ServerJMSStreamMessage(wrapped, deliveryCount);
+         case org.apache.activemq6.api.core.Message.BYTES_TYPE:
+            return new ServerJMSBytesMessage(wrapped, deliveryCount);
+         case org.apache.activemq6.api.core.Message.MAP_TYPE:
+            return new ServerJMSMapMessage(wrapped, deliveryCount);
+         case org.apache.activemq6.api.core.Message.TEXT_TYPE:
+            return new ServerJMSTextMessage(wrapped, deliveryCount);
+         default:
+            return new ServerJMSMessage(wrapped, deliveryCount);
+      }
+
+   }
+
+
+   @Override
+   public String toAddress(Destination destination)
+   {
+      return null;
+   }
+
+
+   private ServerMessageImpl newMessage(byte messageType)
+   {
+      ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512);
+      message.setType(messageType);
+      ((ResetLimitWrappedHornetQBuffer)message.getBodyBuffer()).setMessage(null);
+      return message;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java
new file mode 100644
index 0000000..8081f6c
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.converter;
+
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.jms.InboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
+import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.apache.activemq6.core.server.ServerMessage;
+import org.apache.activemq6.spi.core.protocol.MessageConverter;
+import org.apache.activemq6.utils.IDGenerator;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonMessageConverter implements MessageConverter
+{
+
+
+   HornetQJMSVendor hornetQJMSVendor;
+
+   public ProtonMessageConverter(IDGenerator idGenerator)
+   {
+      hornetQJMSVendor = new HornetQJMSVendor(idGenerator);
+      inboundTransformer = new JMSMappingInboundTransformer(hornetQJMSVendor);
+      outboundTransformer = new JMSMappingOutboundTransformer(hornetQJMSVendor);
+   }
+
+   private final InboundTransformer inboundTransformer;
+   private final JMSMappingOutboundTransformer outboundTransformer;
+
+   @Override
+   public ServerMessage inbound(Object messageSource) throws Exception
+   {
+      ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource);
+
+      return (ServerMessage)jmsMessage.getInnerMessage();
+   }
+
+   /**
+    * Just create the JMS Part of the inbound (for testing)
+    * @param messageSource
+    * @return
+    * @throws Exception
+    */
+   public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception
+   {
+      EncodedMessage encodedMessageSource = messageSource;
+      ServerJMSMessage transformedMessage = (ServerJMSMessage)inboundTransformer.transform(encodedMessageSource);
+
+      transformedMessage.encode();
+
+      return transformedMessage;
+   }
+
+
+   @Override
+   public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception
+   {
+      ServerJMSMessage jmsMessage = hornetQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+      jmsMessage.decode();
+
+      return outboundTransformer.convert(jmsMessage);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
new file mode 100644
index 0000000..532f721
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.converter.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.apache.activemq6.core.message.impl.MessageImpl;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesMessageReset;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBytes;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadChar;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadDouble;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadFloat;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadInt;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadLong;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUTF;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteByte;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteChar;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteInt;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteLong;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteObject;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteShort;
+import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteUTF;
+
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage
+{
+   public ServerJMSBytesMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+   }
+
+   @Override
+   public long getBodyLength() throws JMSException
+   {
+      return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+   }
+
+   @Override
+   public boolean readBoolean() throws JMSException
+   {
+      return bytesReadBoolean(message);
+   }
+
+   @Override
+   public byte readByte() throws JMSException
+   {
+      return bytesReadByte(message);
+   }
+
+   @Override
+   public int readUnsignedByte() throws JMSException
+   {
+      return bytesReadUnsignedByte(message);
+   }
+
+   @Override
+   public short readShort() throws JMSException
+   {
+      return bytesReadShort(message);
+   }
+
+   @Override
+   public int readUnsignedShort() throws JMSException
+   {
+      return bytesReadUnsignedShort(message);
+   }
+
+   @Override
+   public char readChar() throws JMSException
+   {
+      return bytesReadChar(message);
+   }
+
+   @Override
+   public int readInt() throws JMSException
+   {
+      return bytesReadInt(message);
+   }
+
+   @Override
+   public long readLong() throws JMSException
+   {
+      return bytesReadLong(message);
+   }
+
+   @Override
+   public float readFloat() throws JMSException
+   {
+      return bytesReadFloat(message);
+   }
+
+   @Override
+   public double readDouble() throws JMSException
+   {
+      return bytesReadDouble(message);
+   }
+
+   @Override
+   public String readUTF() throws JMSException
+   {
+      return bytesReadUTF(message);
+   }
+
+   @Override
+   public int readBytes(byte[] value) throws JMSException
+   {
+      return bytesReadBytes(message, value);
+   }
+
+   @Override
+   public int readBytes(byte[] value, int length) throws JMSException
+   {
+      return bytesReadBytes(message, value, length);
+   }
+
+   @Override
+   public void writeBoolean(boolean value) throws JMSException
+   {
+      bytesWriteBoolean(message, value);
+
+   }
+
+   @Override
+   public void writeByte(byte value) throws JMSException
+   {
+      bytesWriteByte(message, value);
+   }
+
+   @Override
+   public void writeShort(short value) throws JMSException
+   {
+      bytesWriteShort(message, value);
+   }
+
+   @Override
+   public void writeChar(char value) throws JMSException
+   {
+      bytesWriteChar(message, value);
+   }
+
+   @Override
+   public void writeInt(int value) throws JMSException
+   {
+      bytesWriteInt(message, value);
+   }
+
+   @Override
+   public void writeLong(long value) throws JMSException
+   {
+      bytesWriteLong(message, value);
+   }
+
+   @Override
+   public void writeFloat(float value) throws JMSException
+   {
+      bytesWriteFloat(message, value);
+   }
+
+   @Override
+   public void writeDouble(double value) throws JMSException
+   {
+      bytesWriteDouble(message, value);
+   }
+
+   @Override
+   public void writeUTF(String value) throws JMSException
+   {
+      bytesWriteUTF(message, value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value) throws JMSException
+   {
+      bytesWriteBytes(message, value);
+   }
+
+   @Override
+   public void writeBytes(byte[] value, int offset, int length) throws JMSException
+   {
+      bytesWriteBytes(message, value, offset, length);
+   }
+
+   @Override
+   public void writeObject(Object value) throws JMSException
+   {
+      if (!bytesWriteObject(message, value))
+      {
+         throw new JMSException("Can't make conversion of " + value + " to any known type");
+      }
+   }
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      // this is to make sure we encode the body-length before it's persisted
+      getBodyLength();
+   }
+
+
+   public void decode() throws Exception
+   {
+      super.decode();
+
+   }
+
+   @Override
+   public void reset() throws JMSException
+   {
+      bytesMessageReset(message);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
new file mode 100644
index 0000000..ad80904
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq6.api.core.HornetQPropertyConversionException;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.utils.TypedProperties;
+
+import static org.apache.activemq6.reader.MapMessageUtil.readBodyMap;
+import static org.apache.activemq6.reader.MapMessageUtil.writeBodyMap;
+
+/**
+ * HornetQ implementation of a JMS MapMessage.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version $Revision: 3412 $
+ */
+public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.MAP_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   private final TypedProperties map = new TypedProperties();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSMapMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+
+   // MapMessage implementation -------------------------------------
+
+   public void setBoolean(final String name, final boolean value) throws JMSException
+   {
+      map.putBooleanProperty(new SimpleString(name), value);
+   }
+
+   public void setByte(final String name, final byte value) throws JMSException
+   {
+      map.putByteProperty(new SimpleString(name), value);
+   }
+
+   public void setShort(final String name, final short value) throws JMSException
+   {
+      map.putShortProperty(new SimpleString(name), value);
+   }
+
+   public void setChar(final String name, final char value) throws JMSException
+   {
+      map.putCharProperty(new SimpleString(name), value);
+   }
+
+   public void setInt(final String name, final int value) throws JMSException
+   {
+      map.putIntProperty(new SimpleString(name), value);
+   }
+
+   public void setLong(final String name, final long value) throws JMSException
+   {
+      map.putLongProperty(new SimpleString(name), value);
+   }
+
+   public void setFloat(final String name, final float value) throws JMSException
+   {
+      map.putFloatProperty(new SimpleString(name), value);
+   }
+
+   public void setDouble(final String name, final double value) throws JMSException
+   {
+      map.putDoubleProperty(new SimpleString(name), value);
+   }
+
+   public void setString(final String name, final String value) throws JMSException
+   {
+      map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value));
+   }
+
+   public void setBytes(final String name, final byte[] value) throws JMSException
+   {
+      map.putBytesProperty(new SimpleString(name), value);
+   }
+
+   public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException
+   {
+      if (offset + length > value.length)
+      {
+         throw new JMSException("Invalid offset/length");
+      }
+      byte[] newBytes = new byte[length];
+      System.arraycopy(value, offset, newBytes, 0, length);
+      map.putBytesProperty(new SimpleString(name), newBytes);
+   }
+
+   public void setObject(final String name, final Object value) throws JMSException
+   {
+      try
+      {
+         TypedProperties.setObjectProperty(new SimpleString(name), value, map);
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public boolean getBoolean(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getBooleanProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public byte getByte(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getByteProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public short getShort(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getShortProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public char getChar(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getCharProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public int getInt(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getIntProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public long getLong(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getLongProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public float getFloat(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getFloatProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public double getDouble(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getDoubleProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public String getString(final String name) throws JMSException
+   {
+      try
+      {
+         SimpleString str = map.getSimpleStringProperty(new SimpleString(name));
+         if (str == null)
+         {
+            return null;
+         }
+         else
+         {
+            return str.toString();
+         }
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public byte[] getBytes(final String name) throws JMSException
+   {
+      try
+      {
+         return map.getBytesProperty(new SimpleString(name));
+      }
+      catch (HornetQPropertyConversionException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+   }
+
+   public Object getObject(final String name) throws JMSException
+   {
+      Object val = map.getProperty(new SimpleString(name));
+
+      if (val instanceof SimpleString)
+      {
+         val = ((SimpleString) val).toString();
+      }
+
+      return val;
+   }
+
+   public Enumeration getMapNames() throws JMSException
+   {
+      Set<SimpleString> simplePropNames = map.getPropertyNames();
+      Set<String> propNames = new HashSet<String>(simplePropNames.size());
+
+      for (SimpleString str : simplePropNames)
+      {
+         propNames.add(str.toString());
+      }
+
+      return Collections.enumeration(propNames);
+   }
+
+   public boolean itemExists(final String name) throws JMSException
+   {
+      return map.containsProperty(new SimpleString(name));
+   }
+
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      map.clear();
+   }
+
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      writeBodyMap(message, map);
+   }
+
+   public void decode() throws Exception
+   {
+      super.decode();
+      readBodyMap(message, map);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java
new file mode 100644
index 0000000..e6f0d8f
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.converter.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
+
+import org.apache.activemq6.api.core.HornetQException;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.jms.client.HornetQDestination;
+import org.apache.activemq6.jms.client.HornetQQueue;
+import org.apache.activemq6.reader.MessageUtil;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ServerJMSMessage implements Message
+{
+   protected final MessageInternal message;
+
+   protected int deliveryCount;
+
+   public MessageInternal getInnerMessage()
+   {
+      return message;
+   }
+
+
+   public ServerJMSMessage(MessageInternal message, int deliveryCount)
+   {
+      this.message = message;
+      this.deliveryCount = deliveryCount;
+   }
+
+
+   @Override
+   public final String getJMSMessageID() throws JMSException
+   {
+      return null;
+   }
+
+   @Override
+   public final void setJMSMessageID(String id) throws JMSException
+   {
+   }
+
+   @Override
+   public final long getJMSTimestamp() throws JMSException
+   {
+      return message.getTimestamp();
+   }
+
+   @Override
+   public final void setJMSTimestamp(long timestamp) throws JMSException
+   {
+      message.setTimestamp(timestamp);
+   }
+
+
+   @Override
+   public final byte[] getJMSCorrelationIDAsBytes() throws JMSException
+   {
+      return MessageUtil.getJMSCorrelationIDAsBytes(message);
+   }
+
+   @Override
+   public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException
+   {
+      try
+      {
+         MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+      }
+      catch (HornetQException e)
+      {
+         throw new JMSException(e.getMessage());
+      }
+   }
+
+   @Override
+   public final void setJMSCorrelationID(String correlationID) throws JMSException
+   {
+      MessageUtil.setJMSCorrelationID(message, correlationID);
+   }
+
+   @Override
+   public final String getJMSCorrelationID() throws JMSException
+   {
+      return MessageUtil.getJMSCorrelationID(message);
+   }
+
+   @Override
+   public final Destination getJMSReplyTo() throws JMSException
+   {
+      SimpleString reply = MessageUtil.getJMSReplyTo(message);
+      if (reply != null)
+      {
+         return HornetQDestination.fromAddress(reply.toString());
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public final void setJMSReplyTo(Destination replyTo) throws JMSException
+   {
+      MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((HornetQDestination) replyTo).getSimpleAddress());
+
+   }
+
+   public final Destination getJMSDestination() throws JMSException
+   {
+      SimpleString sdest = message.getAddress();
+
+      if (sdest == null)
+      {
+         return null;
+      }
+      else
+      {
+         if (!sdest.toString().startsWith("jms."))
+         {
+            return new HornetQQueue(sdest.toString(), sdest.toString());
+         }
+         else
+         {
+            return HornetQDestination.fromAddress(sdest.toString());
+         }
+      }
+   }
+
+   @Override
+   public final void setJMSDestination(Destination destination) throws JMSException
+   {
+      if (destination == null)
+      {
+         message.setAddress(null);
+      }
+      else
+      {
+         message.setAddress(((HornetQDestination) destination).getSimpleAddress());
+      }
+
+   }
+
+   @Override
+   public final int getJMSDeliveryMode() throws JMSException
+   {
+      return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+   }
+
+   @Override
+   public final void setJMSDeliveryMode(int deliveryMode) throws JMSException
+   {
+      if (deliveryMode == DeliveryMode.PERSISTENT)
+      {
+         message.setDurable(true);
+      }
+      else if (deliveryMode == DeliveryMode.NON_PERSISTENT)
+      {
+         message.setDurable(false);
+      }
+      else
+      {
+         throw new JMSException("Invalid mode " + deliveryMode);
+      }
+   }
+
+   @Override
+   public final boolean getJMSRedelivered() throws JMSException
+   {
+      return false;
+   }
+
+   @Override
+   public final void setJMSRedelivered(boolean redelivered) throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public final String getJMSType() throws JMSException
+   {
+      return MessageUtil.getJMSType(message);
+   }
+
+   @Override
+   public final void setJMSType(String type) throws JMSException
+   {
+      MessageUtil.setJMSType(message, type);
+   }
+
+   @Override
+   public final long getJMSExpiration() throws JMSException
+   {
+      return message.getExpiration();
+   }
+
+   @Override
+   public final void setJMSExpiration(long expiration) throws JMSException
+   {
+      message.setExpiration(expiration);
+   }
+
+   @Override
+   public final long getJMSDeliveryTime() throws JMSException
+   {
+      // no op
+      return 0;
+   }
+
+   @Override
+   public final void setJMSDeliveryTime(long deliveryTime) throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public final int getJMSPriority() throws JMSException
+   {
+      return message.getPriority();
+   }
+
+   @Override
+   public final void setJMSPriority(int priority) throws JMSException
+   {
+      message.setPriority((byte) priority);
+   }
+
+   @Override
+   public final void clearProperties() throws JMSException
+   {
+      MessageUtil.clearProperties(message);
+
+   }
+
+   @Override
+   public final boolean propertyExists(String name) throws JMSException
+   {
+      return MessageUtil.propertyExists(message, name);
+   }
+
+   @Override
+   public final boolean getBooleanProperty(String name) throws JMSException
+   {
+      return message.getBooleanProperty(name);
+   }
+
+   @Override
+   public final byte getByteProperty(String name) throws JMSException
+   {
+      return message.getByteProperty(name);
+   }
+
+   @Override
+   public final short getShortProperty(String name) throws JMSException
+   {
+      return message.getShortProperty(name);
+   }
+
+   @Override
+   public final int getIntProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return deliveryCount;
+      }
+
+      return message.getIntProperty(name);
+   }
+
+   @Override
+   public final long getLongProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return deliveryCount;
+      }
+
+      return message.getLongProperty(name);
+   }
+
+   @Override
+   public final float getFloatProperty(String name) throws JMSException
+   {
+      return message.getFloatProperty(name);
+   }
+
+   @Override
+   public final double getDoubleProperty(String name) throws JMSException
+   {
+      return message.getDoubleProperty(name);
+   }
+
+   @Override
+   public final String getStringProperty(String name) throws JMSException
+   {
+      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name))
+      {
+         return String.valueOf(deliveryCount);
+      }
+
+
+      return message.getStringProperty(name);
+   }
+
+   @Override
+   public final Object getObjectProperty(String name) throws JMSException
+   {
+      Object val = message.getObjectProperty(name);
+      if (val instanceof SimpleString)
+      {
+         val = ((SimpleString)val).toString();
+      }
+      return val;
+   }
+
+   @Override
+   public final Enumeration getPropertyNames() throws JMSException
+   {
+      return Collections.enumeration(MessageUtil.getPropertyNames(message));
+   }
+
+   @Override
+   public final void setBooleanProperty(String name, boolean value) throws JMSException
+   {
+      message.putBooleanProperty(name, value);
+   }
+
+   @Override
+   public final void setByteProperty(String name, byte value) throws JMSException
+   {
+      message.putByteProperty(name, value);
+   }
+
+   @Override
+   public final void setShortProperty(String name, short value) throws JMSException
+   {
+      message.putShortProperty(name, value);
+   }
+
+   @Override
+   public final void setIntProperty(String name, int value) throws JMSException
+   {
+      message.putIntProperty(name, value);
+   }
+
+   @Override
+   public final void setLongProperty(String name, long value) throws JMSException
+   {
+      message.putLongProperty(name, value);
+   }
+
+   @Override
+   public final void setFloatProperty(String name, float value) throws JMSException
+   {
+      message.putFloatProperty(name, value);
+   }
+
+   @Override
+   public final void setDoubleProperty(String name, double value) throws JMSException
+   {
+      message.putDoubleProperty(name, value);
+   }
+
+   @Override
+   public final void setStringProperty(String name, String value) throws JMSException
+   {
+      message.putStringProperty(name, value);
+   }
+
+   @Override
+   public final void setObjectProperty(String name, Object value) throws JMSException
+   {
+      message.putObjectProperty(name, value);
+   }
+
+   @Override
+   public final void acknowledge() throws JMSException
+   {
+      // no op
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      message.getBodyBuffer().clear();
+   }
+
+   @Override
+   public final <T> T getBody(Class<T> c) throws JMSException
+   {
+      // no op.. jms2 not used on the conversion
+      return null;
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   public void encode() throws Exception
+   {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+
+   public void decode() throws Exception
+   {
+      message.getBodyBuffer().resetReaderIndex();
+   }
+
+   @Override
+   public final boolean isBodyAssignableTo(Class c) throws JMSException
+   {
+      // no op.. jms2 not used on the conversion
+      return false;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
new file mode 100644
index 0000000..d190d45
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.Pair;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+import org.apache.activemq6.utils.DataConstants;
+
+import static org.apache.activemq6.reader.MessageUtil.getBodyBuffer;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBoolean;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadByte;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBytes;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadChar;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadDouble;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadFloat;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadInteger;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadLong;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadObject;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadShort;
+import static org.apache.activemq6.reader.StreamMessageUtil.streamReadString;
+
+public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage
+{
+   public static final byte TYPE = Message.STREAM_TYPE;
+
+   private int bodyLength = 0;
+
+
+   public ServerJMSStreamMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+
+   // StreamMessage implementation ----------------------------------
+
+   public boolean readBoolean() throws JMSException
+   {
+      try
+      {
+         return streamReadBoolean(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public byte readByte() throws JMSException
+   {
+      try
+      {
+         return streamReadByte(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public short readShort() throws JMSException
+   {
+
+      try
+      {
+         return streamReadShort(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public char readChar() throws JMSException
+   {
+
+      try
+      {
+         return streamReadChar(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public int readInt() throws JMSException
+   {
+
+      try
+      {
+         return streamReadInteger(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public long readLong() throws JMSException
+   {
+
+      try
+      {
+         return streamReadLong(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public float readFloat() throws JMSException
+   {
+
+      try
+      {
+         return streamReadFloat(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public double readDouble() throws JMSException
+   {
+
+      try
+      {
+         return streamReadDouble(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public String readString() throws JMSException
+   {
+
+      try
+      {
+         return streamReadString(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   /**
+    * len here is used to control how many more bytes to read
+    */
+   private int len = 0;
+
+   public int readBytes(final byte[] value) throws JMSException
+   {
+
+      try
+      {
+         Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value);
+
+         len = pairRead.getA();
+         return pairRead.getB();
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public Object readObject() throws JMSException
+   {
+
+      if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition())
+      {
+         throw new MessageEOFException("");
+      }
+      try
+      {
+         return streamReadObject(message);
+      }
+      catch (IllegalStateException e)
+      {
+         throw new MessageFormatException(e.getMessage());
+      }
+      catch (IndexOutOfBoundsException e)
+      {
+         throw new MessageEOFException("");
+      }
+   }
+
+   public void writeBoolean(final boolean value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BOOLEAN);
+      getBuffer().writeBoolean(value);
+   }
+
+   public void writeByte(final byte value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTE);
+      getBuffer().writeByte(value);
+   }
+
+   public void writeShort(final short value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.SHORT);
+      getBuffer().writeShort(value);
+   }
+
+   public void writeChar(final char value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.CHAR);
+      getBuffer().writeShort((short) value);
+   }
+
+   public void writeInt(final int value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.INT);
+      getBuffer().writeInt(value);
+   }
+
+   public void writeLong(final long value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.LONG);
+      getBuffer().writeLong(value);
+   }
+
+   public void writeFloat(final float value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.FLOAT);
+      getBuffer().writeInt(Float.floatToIntBits(value));
+   }
+
+   public void writeDouble(final double value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.DOUBLE);
+      getBuffer().writeLong(Double.doubleToLongBits(value));
+   }
+
+   public void writeString(final String value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.STRING);
+      getBuffer().writeNullableString(value);
+   }
+
+   public void writeBytes(final byte[] value) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(value.length);
+      getBuffer().writeBytes(value);
+   }
+
+   public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException
+   {
+
+      getBuffer().writeByte(DataConstants.BYTES);
+      getBuffer().writeInt(length);
+      getBuffer().writeBytes(value, offset, length);
+   }
+
+   public void writeObject(final Object value) throws JMSException
+   {
+      if (value instanceof String)
+      {
+         writeString((String) value);
+      }
+      else if (value instanceof Boolean)
+      {
+         writeBoolean((Boolean) value);
+      }
+      else if (value instanceof Byte)
+      {
+         writeByte((Byte) value);
+      }
+      else if (value instanceof Short)
+      {
+         writeShort((Short) value);
+      }
+      else if (value instanceof Integer)
+      {
+         writeInt((Integer) value);
+      }
+      else if (value instanceof Long)
+      {
+         writeLong((Long) value);
+      }
+      else if (value instanceof Float)
+      {
+         writeFloat((Float) value);
+      }
+      else if (value instanceof Double)
+      {
+         writeDouble((Double) value);
+      }
+      else if (value instanceof byte[])
+      {
+         writeBytes((byte[]) value);
+      }
+      else if (value instanceof Character)
+      {
+         writeChar((Character) value);
+      }
+      else if (value == null)
+      {
+         writeString(null);
+      }
+      else
+      {
+         throw new MessageFormatException("Invalid object type: " + value.getClass());
+      }
+   }
+
+   public void reset() throws JMSException
+   {
+      getBuffer().resetReaderIndex();
+   }
+
+   // HornetQRAMessage overrides ----------------------------------------
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      getBuffer().clear();
+   }
+
+   private HornetQBuffer getBuffer()
+   {
+      return message.getBodyBuffer();
+   }
+
+
+   public void decode() throws Exception
+   {
+      super.decode();
+   }
+
+   /**
+    * Encode the body into the internal message
+    */
+   public void encode() throws Exception
+   {
+      super.encode();
+      bodyLength = message.getEndOfBodyPosition();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
new file mode 100644
index 0000000..6d48427
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.proton.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq6.api.core.Message;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.message.impl.MessageInternal;
+
+import static org.apache.activemq6.reader.TextMessageUtil.readBodyText;
+import static org.apache.activemq6.reader.TextMessageUtil.writeBodyText;
+
+
+/**
+ * HornetQ implementation of a JMS TextMessage.
+ * <br>
+ * This class was ported from SpyTextMessage in JBossMQ.
+ *
+ * @author Norbert Lataille (Norbert.Lataille@m4x.org)
+ * @author <a href="mailto:jason@planet57.com">Jason Dillon</a>
+ * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
+ * @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
+ * @version $Revision: 3412 $
+ */
+public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage
+{
+   // Constants -----------------------------------------------------
+
+   public static final byte TYPE = Message.TEXT_TYPE;
+
+   // Attributes ----------------------------------------------------
+
+   // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write
+   // methods are more efficient for a SimpleString
+   private SimpleString text;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   /*
+    * This constructor is used to construct messages prior to sending
+    */
+   public ServerJMSTextMessage(MessageInternal message, int deliveryCount)
+   {
+      super(message, deliveryCount);
+
+   }
+   // TextMessage implementation ------------------------------------
+
+   public void setText(final String text) throws JMSException
+   {
+      if (text != null)
+      {
+         this.text = new SimpleString(text);
+      }
+      else
+      {
+         this.text = null;
+      }
+
+      writeBodyText(message, this.text);
+   }
+
+   public String getText()
+   {
+      if (text != null)
+      {
+         return text.toString();
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   @Override
+   public void clearBody() throws JMSException
+   {
+      super.clearBody();
+
+      text = null;
+   }
+
+
+   public void encode() throws Exception
+   {
+      super.encode();
+      writeBodyText(message, text);
+   }
+
+   public void decode() throws Exception
+   {
+      super.decode();
+      text = readBodyText(message);
+   }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java
new file mode 100644
index 0000000..d3ea9b2
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+/**
+ * This package contains incomplete JMS implementations just to be used with converting amqp to hornetq and
+ * vice versa
+ * @author Clebert Suconic
+ */
+
+package org.apache.activemq6.core.protocol.proton.converter.jms;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java
new file mode 100644
index 0000000..55fb67e
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.plug;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq6.core.protocol.proton.HornetQProtonRemotingConnection;
+import org.apache.activemq6.core.protocol.proton.ProtonProtocolManager;
+import org.apache.activemq6.core.protocol.proton.sasl.HornetQPlainSASL;
+import org.apache.activemq6.spi.core.remoting.Connection;
+import org.apache.activemq6.utils.ReusableLatch;
+import org.proton.plug.AMQPConnectionCallback;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.ServerSASL;
+import org.proton.plug.sasl.AnonymousServerSASL;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQProtonConnectionCallback implements AMQPConnectionCallback
+{
+   private final ProtonProtocolManager manager;
+
+   private final Connection connection;
+
+   protected HornetQProtonRemotingConnection protonConnectionDelegate;
+
+   protected AMQPConnectionContext amqpConnection;
+
+   private final ReusableLatch latch = new ReusableLatch(0);
+
+   public HornetQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection)
+   {
+      this.manager = manager;
+      this.connection = connection;
+   }
+
+   @Override
+   public ServerSASL[] getSASLMechnisms()
+   {
+      return new ServerSASL[]{new AnonymousServerSASL(), new HornetQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())};
+   }
+
+   @Override
+   public void close()
+   {
+
+   }
+
+   @Override
+   public void setConnection(AMQPConnectionContext connection)
+   {
+      this.amqpConnection = connection;
+   }
+
+   @Override
+   public AMQPConnectionContext getConnection()
+   {
+      return amqpConnection;
+   }
+
+   public HornetQProtonRemotingConnection getProtonConnectionDelegate()
+   {
+      return protonConnectionDelegate;
+   }
+
+   public void setProtonConnectionDelegate(HornetQProtonRemotingConnection protonConnectionDelegate)
+   {
+      this.protonConnectionDelegate = protonConnectionDelegate;
+   }
+
+   public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection)
+   {
+      final int size = byteBuf.writerIndex();
+
+      latch.countUp();
+      connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener()
+      {
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception
+         {
+            latch.countDown();
+         }
+      });
+
+      if (amqpConnection.isSyncOnFlush())
+      {
+         try
+         {
+            latch.await(5, TimeUnit.SECONDS);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+
+      amqpConnection.outputDone(size);
+   }
+
+
+   @Override
+   public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection)
+   {
+      return new ProtonSessionIntegrationCallback(this, manager, connection);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
new file mode 100644
index 0000000..85e3670
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -0,0 +1,341 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.plug;
+
+
+import io.netty.buffer.ByteBuf;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.api.core.client.HornetQClient;
+import org.apache.activemq6.core.journal.IOAsyncTask;
+import org.apache.activemq6.core.protocol.proton.ProtonProtocolManager;
+import org.apache.activemq6.core.server.QueueQueryResult;
+import org.apache.activemq6.core.server.ServerConsumer;
+import org.apache.activemq6.core.server.ServerMessage;
+import org.apache.activemq6.core.server.ServerSession;
+import org.apache.activemq6.spi.core.protocol.SessionCallback;
+import org.apache.activemq6.spi.core.remoting.ReadyListener;
+import org.apache.activemq6.utils.ByteUtil;
+import org.apache.activemq6.utils.IDGenerator;
+import org.apache.activemq6.utils.SimpleIDGenerator;
+import org.apache.activemq6.utils.UUIDGenerator;
+import org.proton.plug.AMQPConnectionContext;
+import org.proton.plug.AMQPSessionCallback;
+import org.proton.plug.AMQPSessionContext;
+import org.proton.plug.SASLResult;
+import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.sasl.PlainSASLResult;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback
+{
+   protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
+
+   private final HornetQProtonConnectionCallback protonSPI;
+
+   private final ProtonProtocolManager manager;
+
+   private final AMQPConnectionContext connection;
+
+   private ServerSession serverSession;
+
+   private AMQPSessionContext protonSession;
+
+   public ProtonSessionIntegrationCallback(HornetQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection)
+   {
+      this.protonSPI = protonSPI;
+      this.manager = manager;
+      this.connection = connection;
+   }
+
+   @Override
+   public void onFlowConsumer(Object consumer, int credits)
+   {
+      // We have our own flow control on AMQP, so we set hornetq's flow control to 0
+      ((ServerConsumer) consumer).receiveCredits(-1);
+   }
+
+   @Override
+   public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception
+   {
+
+      this.protonSession = protonSession;
+
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+
+      String user = null;
+      String passcode = null;
+      if (saslResult != null)
+      {
+         user = saslResult.getUser();
+         if (saslResult instanceof PlainSASLResult)
+         {
+            passcode = ((PlainSASLResult)saslResult).getPassword();
+         }
+      }
+
+      serverSession = manager.getServer().createSession(name,
+                                                        user,
+                                                        passcode,
+                                                        HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                        protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
+                                                        false, // boolean autoCommitSends
+                                                        false, // boolean autoCommitAcks,
+                                                        false, // boolean preAcknowledge,
+                                                        true, //boolean xa,
+                                                        (String) null,
+                                                        this,
+                                                        null);
+   }
+
+   @Override
+   public void start()
+   {
+
+   }
+
+   @Override
+   public Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception
+   {
+      long consumerID = consumerIDGenerator.generateID();
+
+      ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filer), browserOnly);
+
+      // AMQP handles its own flow control for when it's started
+      consumer.setStarted(true);
+
+      consumer.setProtocolContext(protonSender);
+
+      return consumer;
+   }
+
+   @Override
+   public void startSender(Object brokerConsumer) throws Exception
+   {
+      ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
+      // flow control is done at proton
+      serverConsumer.receiveCredits(-1);
+   }
+
+   @Override
+   public void createTemporaryQueue(String queueName) throws Exception
+   {
+      serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false);
+   }
+
+   @Override
+   public boolean queueQuery(String queueName) throws Exception
+   {
+      QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName));
+      return queueQuery.isExists();
+   }
+
+   @Override
+   public void closeSender(Object brokerConsumer) throws Exception
+   {
+      ((ServerConsumer) brokerConsumer).close(false);
+   }
+
+   @Override
+   public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception
+   {
+      return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount);
+   }
+
+   @Override
+   public Binary getCurrentTXID()
+   {
+      return new Binary(ByteUtil.longToBytes(serverSession.getCurrentTransaction().getID()));
+   }
+
+   @Override
+   public String tempQueueName()
+   {
+      return UUIDGenerator.getInstance().generateStringUUID();
+   }
+
+   @Override
+   public void commitCurrentTX() throws Exception
+   {
+      serverSession.commit();
+   }
+
+   @Override
+   public void rollbackCurrentTX() throws Exception
+   {
+      serverSession.rollback(false);
+   }
+
+   @Override
+   public void close() throws Exception
+   {
+      serverSession.close(false);
+   }
+
+   @Override
+   public void ack(Object brokerConsumer, Object message) throws Exception
+   {
+      ((ServerConsumer)brokerConsumer).individualAcknowledge(null, ((ServerMessage)message).getMessageID());
+   }
+
+   @Override
+   public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception
+   {
+      ((ServerConsumer)brokerConsumer).individualCancel(((ServerMessage)message).getMessageID(), updateCounts);
+   }
+
+   @Override
+   public void resumeDelivery(Object consumer)
+   {
+      ((ServerConsumer) consumer).receiveCredits(-1);
+   }
+
+   @Override
+   public void serverSend(final Receiver receiver, final Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception
+   {
+      EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex());
+
+      ServerMessage message = manager.getConverter().inbound(encodedMessage);
+      //use the address on the receiver if not null, if null let's hope it was set correctly on the message
+      if (address != null)
+      {
+         message.setAddress(new SimpleString(address));
+      }
+
+      serverSession.send(message, false);
+
+      manager.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask()
+      {
+         @Override
+         public void done()
+         {
+            synchronized (connection.getLock())
+            {
+               delivery.settle();
+               connection.flush();
+            }
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage)
+         {
+            synchronized (connection.getLock())
+            {
+               receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage));
+               connection.flush();
+            }
+         }
+      });
+   }
+
+
+   @Override
+   public void sendProducerCreditsMessage(int credits, SimpleString address)
+   {
+   }
+
+   @Override
+   public void sendProducerCreditsFailMessage(int credits, SimpleString address)
+   {
+   }
+
+   @Override
+   public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount)
+   {
+
+      ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
+
+      try
+      {
+         return plugSender.deliverMessage(message, deliveryCount);
+      }
+      catch (Exception e)
+      {
+         synchronized (connection.getLock())
+         {
+            plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()));
+            connection.flush();
+         }
+         throw new IllegalStateException("Can't deliver message " + e, e);
+      }
+
+   }
+
+   @Override
+   public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount)
+   {
+      return 0;
+   }
+
+   @Override
+   public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      return 0;
+   }
+
+   @Override
+   public void closed()
+   {
+   }
+
+   @Override
+   public void addReadyListener(ReadyListener listener)
+   {
+
+   }
+
+   @Override
+   public void removeReadyListener(ReadyListener listener)
+   {
+
+   }
+
+   @Override
+   public void disconnect(ServerConsumer consumer, String queueName)
+   {
+      synchronized (connection.getLock())
+      {
+         ((Link) consumer.getProtocolContext()).close();
+         connection.flush();
+      }
+   }
+
+
+   @Override
+   public boolean hasCredits(ServerConsumer consumer)
+   {
+      ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext();
+
+      if (plugSender != null && plugSender.getSender().getCredit() > 0)
+      {
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java
new file mode 100644
index 0000000..9bdf305
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+/**
+ * This package contains classes for integration with the ProtonPlug
+ * @author Clebert Suconic
+ */
+package org.apache.activemq6.core.protocol.proton.plug;
\ No newline at end of file


Mime
View raw message