activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [03/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ
Date Tue, 11 Nov 2014 18:41:33 GMT
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/sasl/HornetQPlainSASL.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/sasl/HornetQPlainSASL.java
b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/sasl/HornetQPlainSASL.java
new file mode 100644
index 0000000..b92b7ee
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/sasl/HornetQPlainSASL.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton.sasl;
+
+import org.apache.activemq6.core.security.SecurityStore;
+import org.apache.activemq6.spi.core.security.HornetQSecurityManager;
+import org.proton.plug.sasl.ServerSASLPlain;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class HornetQPlainSASL extends ServerSASLPlain
+{
+
+   private final HornetQSecurityManager securityManager;
+
+   private final SecurityStore securityStore;
+
+
+   public HornetQPlainSASL(SecurityStore securityStore, HornetQSecurityManager securityManager)
+   {
+      this.securityManager = securityManager;
+      this.securityStore = securityStore;
+   }
+
+   @Override
+   protected boolean authenticate(String user, String password)
+   {
+      if (securityStore.isSecurityEnabled())
+      {
+         return securityManager.validateUser(user, password);
+      }
+      else
+      {
+         return true;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
b/activemq6-protocols/activemq6-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
new file mode 100644
index 0000000..ac99052
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/resources/META-INF/services/org.hornetq.spi.core.protocol.ProtocolManagerFactory
@@ -0,0 +1 @@
+org.hornetq.core.protocol.proton.ProtonProtocolManagerFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/test/java/org/apache/activemq6/core/protocol/proton/TestConversions.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/test/java/org/apache/activemq6/core/protocol/proton/TestConversions.java
b/activemq6-protocols/activemq6-amqp-protocol/src/test/java/org/apache/activemq6/core/protocol/proton/TestConversions.java
new file mode 100644
index 0000000..c884809
--- /dev/null
+++ b/activemq6-protocols/activemq6-amqp-protocol/src/test/java/org/apache/activemq6/core/protocol/proton/TestConversions.java
@@ -0,0 +1,841 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq6.core.protocol.proton;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.jms.EncodedMessage;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.ProtonJMessage;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.api.core.SimpleString;
+import org.apache.activemq6.core.journal.EncodingSupport;
+import org.apache.activemq6.core.protocol.proton.converter.ProtonMessageConverter;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq6.core.server.ServerMessage;
+import org.apache.activemq6.utils.SimpleIDGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.proton.plug.util.NettyWritable;
+
+/**
+ * @author Clebert Suconic
+ */
+
+public class TestConversions extends Assert
+{
+
+   @Test
+   public void testSimpleConversionBytes() throws Exception
+   {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      byte[] bodyBytes = new byte[4];
+
+      for (int i = 0; i < bodyBytes.length; i++)
+      {
+         bodyBytes[i] = (byte) 0xff;
+      }
+
+      message.setBody(new Data(new Binary(bodyBytes)));
+
+
+      EncodedMessage encodedMessage = encodeMessage(message);
+
+
+      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
+      ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage)converter.inboundJMSType(encodedMessage);
+
+      verifyProperties(serverMessage);
+
+
+      assertEquals(bodyBytes.length, serverMessage.getBodyLength());
+
+      byte[] newBodyBytes = new byte[4];
+
+      serverMessage.readBytes(newBodyBytes);
+
+
+      Assert.assertArrayEquals(bodyBytes, newBodyBytes);
+
+
+      Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0);
+
+      System.out.println("output = " + obj);
+
+   }
+
+   private void verifyProperties(javax.jms.Message message) throws Exception
+   {
+      assertEquals(true, message.getBooleanProperty("true"));
+      assertEquals(false, message.getBooleanProperty("false"));
+      assertEquals("bar", message.getStringProperty("foo"));
+   }
+
+   private Map<String, Object> createPropertiesMap()
+   {
+      Map<String, Object> mapprop = new HashMap<>();
+
+      mapprop.put("true", Boolean.TRUE);
+      mapprop.put("false", Boolean.FALSE);
+      mapprop.put("foo", "bar");
+      return mapprop;
+   }
+
+   @Test
+   public void testSimpleConversionMap() throws Exception
+   {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      Map<String, Object> mapValues = new HashMap<>();
+      mapValues.put("somestr", "value");
+      mapValues.put("someint", Integer.valueOf(1));
+
+      message.setBody(new AmqpValue(mapValues));
+
+      EncodedMessage encodedMessage = encodeMessage(message);
+
+      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
+      ServerJMSMapMessage serverMessage = (ServerJMSMapMessage)converter.inboundJMSType(encodedMessage);
+
+      verifyProperties(serverMessage);
+
+      Assert.assertEquals(1, serverMessage.getInt("someint"));
+      Assert.assertEquals("value", serverMessage.getString("somestr"));
+
+      Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0);
+
+      reEncodeMsg(obj);
+
+
+      MessageImpl outMessage = (MessageImpl) obj;
+      AmqpValue value = (AmqpValue)outMessage.getBody();
+      Map mapoutput = (Map)value.getValue();
+
+      assertEquals(Integer.valueOf(1), (Integer) mapoutput.get("someint"));
+
+
+      System.out.println("output = " + obj);
+
+   }
+
+
+   @Test
+   public void testSimpleConversionStream() throws Exception
+   {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      List<Object> objects = new LinkedList<>();
+      objects.add(new Integer(10));
+      objects.add("10");
+
+      message.setBody(new AmqpSequence(objects));
+
+      EncodedMessage encodedMessage = encodeMessage(message);
+
+      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
+      ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage)converter.inboundJMSType(encodedMessage);
+
+      simulatePersistence(serverMessage);
+
+      verifyProperties(serverMessage);
+
+      serverMessage.reset();
+
+      assertEquals(10, serverMessage.readInt());
+      assertEquals("10", serverMessage.readString());
+
+      Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0);
+
+      reEncodeMsg(obj);
+
+      MessageImpl outMessage = (MessageImpl)obj;
+      List list = ((AmqpSequence)outMessage.getBody()).getValue();
+      Assert.assertEquals(Integer.valueOf(10), list.get(0));
+      Assert.assertEquals("10", list.get(1));
+
+   }
+
+   @Test
+   public void testSimpleConversionText() throws Exception
+   {
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      message.setApplicationProperties(properties);
+
+      String text = "someText";
+      message.setBody(new AmqpValue(text));
+
+      EncodedMessage encodedMessage = encodeMessage(message);
+
+      ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0));
+      ServerJMSTextMessage serverMessage = (ServerJMSTextMessage)converter.inboundJMSType(encodedMessage);
+
+      simulatePersistence(serverMessage);
+
+
+      verifyProperties(serverMessage);
+
+      Assert.assertEquals(text, serverMessage.getText());
+
+
+      Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0);
+
+
+      reEncodeMsg(obj);
+
+      MessageImpl outMessage = (MessageImpl) obj;
+      AmqpValue value = (AmqpValue)outMessage.getBody();
+      String textValue = (String)value.getValue();
+
+      Assert.assertEquals(text, textValue);
+
+      System.out.println("output = " + obj);
+
+   }
+
+   private void simulatePersistence(ServerJMSMessage serverMessage)
+   {
+      serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress"));
+      // This is just to simulate what would happen during the persistence of the message
+      // We need to still be able to recover the message when we read it back
+      ((EncodingSupport)serverMessage.getInnerMessage()).encode(new EmptyBuffer());
+   }
+
+   private ProtonJMessage reEncodeMsg(Object obj)
+   {
+      ProtonJMessage objOut = (ProtonJMessage)obj;
+
+      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      objOut.encode(new NettyWritable(nettyBuffer));
+      return objOut;
+   }
+
+
+   private EncodedMessage encodeMessage(MessageImpl message)
+   {
+      ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024);
+      message.encode(new NettyWritable(buf));
+      byte[] bytesConvert = new byte[buf.writerIndex()];
+      buf.readBytes(bytesConvert);
+      return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length);
+   }
+
+
+   class EmptyBuffer implements HornetQBuffer
+   {
+      @Override
+      public ByteBuf byteBuf()
+      {
+         return null;
+      }
+
+      @Override
+      public int capacity()
+      {
+         return 0;
+      }
+
+      @Override
+      public int readerIndex()
+      {
+         return 0;
+      }
+
+      @Override
+      public void readerIndex(int readerIndex)
+      {
+
+      }
+
+      @Override
+      public int writerIndex()
+      {
+         return 0;
+      }
+
+      @Override
+      public void writerIndex(int writerIndex)
+      {
+
+      }
+
+      @Override
+      public void setIndex(int readerIndex, int writerIndex)
+      {
+
+      }
+
+      @Override
+      public int readableBytes()
+      {
+         return 0;
+      }
+
+      @Override
+      public int writableBytes()
+      {
+         return 0;
+      }
+
+      @Override
+      public boolean readable()
+      {
+         return false;
+      }
+
+      @Override
+      public boolean writable()
+      {
+         return false;
+      }
+
+      @Override
+      public void clear()
+      {
+
+      }
+
+      @Override
+      public void markReaderIndex()
+      {
+
+      }
+
+      @Override
+      public void resetReaderIndex()
+      {
+
+      }
+
+      @Override
+      public void markWriterIndex()
+      {
+
+      }
+
+      @Override
+      public void resetWriterIndex()
+      {
+
+      }
+
+      @Override
+      public void discardReadBytes()
+      {
+
+      }
+
+      @Override
+      public byte getByte(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public short getUnsignedByte(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public short getShort(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public int getUnsignedShort(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public int getInt(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public long getUnsignedInt(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public long getLong(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public void getBytes(int index, HornetQBuffer dst)
+      {
+
+      }
+
+      @Override
+      public void getBytes(int index, HornetQBuffer dst, int length)
+      {
+
+      }
+
+      @Override
+      public void getBytes(int index, HornetQBuffer dst, int dstIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void getBytes(int index, byte[] dst)
+      {
+
+      }
+
+      @Override
+      public void getBytes(int index, byte[] dst, int dstIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void getBytes(int index, ByteBuffer dst)
+      {
+
+      }
+
+      @Override
+      public char getChar(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public float getFloat(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public double getDouble(int index)
+      {
+         return 0;
+      }
+
+      @Override
+      public void setByte(int index, byte value)
+      {
+
+      }
+
+      @Override
+      public void setShort(int index, short value)
+      {
+
+      }
+
+      @Override
+      public void setInt(int index, int value)
+      {
+
+      }
+
+      @Override
+      public void setLong(int index, long value)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, HornetQBuffer src)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, HornetQBuffer src, int length)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, HornetQBuffer src, int srcIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, byte[] src)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, byte[] src, int srcIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void setBytes(int index, ByteBuffer src)
+      {
+
+      }
+
+      @Override
+      public void setChar(int index, char value)
+      {
+
+      }
+
+      @Override
+      public void setFloat(int index, float value)
+      {
+
+      }
+
+      @Override
+      public void setDouble(int index, double value)
+      {
+
+      }
+
+      @Override
+      public byte readByte()
+      {
+         return 0;
+      }
+
+      @Override
+      public short readUnsignedByte()
+      {
+         return 0;
+      }
+
+      @Override
+      public short readShort()
+      {
+         return 0;
+      }
+
+      @Override
+      public int readUnsignedShort()
+      {
+         return 0;
+      }
+
+      @Override
+      public int readInt()
+      {
+         return 0;
+      }
+
+      @Override
+      public long readUnsignedInt()
+      {
+         return 0;
+      }
+
+      @Override
+      public long readLong()
+      {
+         return 0;
+      }
+
+      @Override
+      public char readChar()
+      {
+         return 0;
+      }
+
+      @Override
+      public float readFloat()
+      {
+         return 0;
+      }
+
+      @Override
+      public double readDouble()
+      {
+         return 0;
+      }
+
+      @Override
+      public boolean readBoolean()
+      {
+         return false;
+      }
+
+      @Override
+      public SimpleString readNullableSimpleString()
+      {
+         return null;
+      }
+
+      @Override
+      public String readNullableString()
+      {
+         return null;
+      }
+
+      @Override
+      public SimpleString readSimpleString()
+      {
+         return null;
+      }
+
+      @Override
+      public String readString()
+      {
+         return null;
+      }
+
+      @Override
+      public String readUTF()
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer readBytes(int length)
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer readSlice(int length)
+      {
+         return null;
+      }
+
+      @Override
+      public void readBytes(HornetQBuffer dst)
+      {
+
+      }
+
+      @Override
+      public void readBytes(HornetQBuffer dst, int length)
+      {
+
+      }
+
+      @Override
+      public void readBytes(HornetQBuffer dst, int dstIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void readBytes(byte[] dst)
+      {
+
+      }
+
+      @Override
+      public void readBytes(byte[] dst, int dstIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void readBytes(ByteBuffer dst)
+      {
+
+      }
+
+      @Override
+      public void skipBytes(int length)
+      {
+
+      }
+
+      @Override
+      public void writeByte(byte value)
+      {
+
+      }
+
+      @Override
+      public void writeShort(short value)
+      {
+
+      }
+
+      @Override
+      public void writeInt(int value)
+      {
+
+      }
+
+      @Override
+      public void writeLong(long value)
+      {
+
+      }
+
+      @Override
+      public void writeChar(char chr)
+      {
+
+      }
+
+      @Override
+      public void writeFloat(float value)
+      {
+
+      }
+
+      @Override
+      public void writeDouble(double value)
+      {
+
+      }
+
+      @Override
+      public void writeBoolean(boolean val)
+      {
+
+      }
+
+      @Override
+      public void writeNullableSimpleString(SimpleString val)
+      {
+
+      }
+
+      @Override
+      public void writeNullableString(String val)
+      {
+
+      }
+
+      @Override
+      public void writeSimpleString(SimpleString val)
+      {
+
+      }
+
+      @Override
+      public void writeString(String val)
+      {
+
+      }
+
+      @Override
+      public void writeUTF(String utf)
+      {
+
+      }
+
+      @Override
+      public void writeBytes(HornetQBuffer src, int length)
+      {
+
+      }
+
+      @Override
+      public void writeBytes(HornetQBuffer src, int srcIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void writeBytes(byte[] src)
+      {
+
+      }
+
+      @Override
+      public void writeBytes(byte[] src, int srcIndex, int length)
+      {
+
+      }
+
+      @Override
+      public void writeBytes(ByteBuffer src)
+      {
+
+      }
+
+      @Override
+      public HornetQBuffer copy()
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer copy(int index, int length)
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer slice()
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer slice(int index, int length)
+      {
+         return null;
+      }
+
+      @Override
+      public HornetQBuffer duplicate()
+      {
+         return null;
+      }
+
+      @Override
+      public ByteBuffer toByteBuffer()
+      {
+         return null;
+      }
+
+      @Override
+      public ByteBuffer toByteBuffer(int index, int length)
+      {
+         return null;
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/pom.xml b/activemq6-protocols/activemq6-openwire-protocol/pom.xml
new file mode 100644
index 0000000..e3d858b
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/pom.xml
@@ -0,0 +1,49 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+   <parent>
+      <artifactId>activemq6-protocols</artifactId>
+      <groupId>org.apache.activemq6</groupId>
+      <version>6.0.0-SNAPSHOT</version>
+   </parent>
+   <modelVersion>4.0.0</modelVersion>
+
+   <artifactId>activemq6-openwire-protocol</artifactId>
+
+   <properties>
+      <hornetq.basedir>${project.parent.parent.basedir}</hornetq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+
+      <!--
+          JBoss Logging
+      -->
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-client</artifactId>
+      </dependency>
+      <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-server</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq6</groupId>
+         <artifactId>activemq6-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+   </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQConnectorImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQConnectorImpl.java
b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQConnectorImpl.java
new file mode 100644
index 0000000..792aa0d
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQConnectorImpl.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.openwire;
+
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq6.core.protocol.openwire.amq.AMQConnector;
+import org.apache.activemq6.core.protocol.openwire.amq.AMQConnectorStatistics;
+import org.apache.activemq6.spi.core.remoting.Acceptor;
+
+public class AMQConnectorImpl implements AMQConnector
+{
+   private Acceptor acceptor;
+
+   public AMQConnectorImpl(Acceptor acceptorUsed)
+   {
+      this.acceptor = acceptorUsed;
+   }
+
+   @Override
+   public BrokerInfo getBrokerInfo()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   @Override
+   public AMQConnectorStatistics getStatistics()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   @Override
+   public boolean isUpdateClusterClients()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   @Override
+   public boolean isRebalanceClusterClients()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   @Override
+   public void updateClientClusterInfo()
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public boolean isUpdateClusterClientsOnRemove()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   @Override
+   public int connectionCount()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   @Override
+   public boolean isAllowLinkStealing()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   @Override
+   public ConnectionControl getConnectionControl()
+   {
+      return new ConnectionControl();
+   }
+
+   @Override
+   public void onStarted(OpenWireConnection connection)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public void onStopped(OpenWireConnection connection)
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   public int getMaximumConsumersAllowedPerConnection()
+   {
+      return 1000000;//this belongs to configuration, now hardcoded
+   }
+
+   public int getMaximumProducersAllowedPerConnection()
+   {
+      return 1000000;//this belongs to configuration, now hardcoded
+   }
+
+   public boolean isAuditNetworkProducers()
+   {
+      return false;
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQTransactionImpl.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQTransactionImpl.java
b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQTransactionImpl.java
new file mode 100644
index 0000000..6b796d1
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/AMQTransactionImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.openwire;
+
+import org.apache.activemq6.core.persistence.StorageManager;
+import org.apache.activemq6.core.server.Queue;
+import org.apache.activemq6.core.server.impl.RefsOperation;
+import org.apache.activemq6.core.transaction.Transaction;
+import org.apache.activemq6.core.transaction.impl.TransactionImpl;
+
+import javax.transaction.xa.Xid;
+
+public class AMQTransactionImpl extends TransactionImpl
+{
+   private boolean rollbackForClose = false;
+
+   public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds)
+   {
+      super(storageManager, timeoutSeconds);
+   }
+
+   public AMQTransactionImpl(StorageManager storageManager)
+   {
+      super(storageManager);
+   }
+
+   public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds)
+   {
+      super(xid, storageManager, timeoutSeconds);
+   }
+
+   public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager)
+   {
+      super(id, xid, storageManager);
+   }
+
+   @Override
+   public RefsOperation createRefsOperation(Queue queue)
+   {
+      return new AMQrefsOperation(queue, storageManager);
+   }
+
+   public class AMQrefsOperation extends RefsOperation
+   {
+      public AMQrefsOperation(Queue queue, StorageManager storageManager)
+      {
+         super(queue, storageManager);
+      }
+
+      @Override
+      public void afterRollback(Transaction tx)
+      {
+         if (rollbackForClose)
+         {
+            super.afterRollback(tx);
+         }
+      }
+   }
+
+   public void setRollbackForClose()
+   {
+      this.rollbackForClose = true;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/BrokerState.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/BrokerState.java
b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/BrokerState.java
new file mode 100644
index 0000000..4c2149a
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/BrokerState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.openwire;
+
+/**
+ * The class holds related states of an activemq broker.
+ * @author howard
+ *
+ */
+public class BrokerState
+{
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/DataInputWrapper.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/DataInputWrapper.java
b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/DataInputWrapper.java
new file mode 100644
index 0000000..10e077d
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/DataInputWrapper.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.openwire;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq6.api.core.HornetQBuffer;
+import org.apache.activemq6.utils.UTF8Util;
+import org.apache.activemq6.utils.UTF8Util.StringUtilBuffer;
+
+public class DataInputWrapper implements DataInput
+{
+   private static final int DEFAULT_CAPACITY = 1024 * 1024;
+   private static final NotEnoughBytesException exception = new NotEnoughBytesException();
+   private ByteBuffer internalBuffer;
+
+   public DataInputWrapper()
+   {
+      this(DEFAULT_CAPACITY);
+   }
+
+   public DataInputWrapper(int capacity)
+   {
+      this.internalBuffer = ByteBuffer.allocateDirect(capacity);
+      this.internalBuffer.mark();
+      this.internalBuffer.limit(0);
+   }
+
+   public void receiveData(byte[] data)
+   {
+      int newSize = data.length;
+      int freeSpace = internalBuffer.capacity() - internalBuffer.limit();
+      if (freeSpace < newSize)
+      {
+         internalBuffer.reset();
+         internalBuffer.compact();
+         if (internalBuffer.remaining() < newSize)
+         {
+            //need to enlarge
+         }
+         //make sure mark is at zero and position is at effective limit
+         int pos = internalBuffer.position();
+         internalBuffer.position(0);
+         internalBuffer.mark();
+         internalBuffer.position(pos);
+      }
+      else
+      {
+         internalBuffer.position(internalBuffer.limit());
+         internalBuffer.limit(internalBuffer.capacity());
+      }
+      internalBuffer.put(data);
+      internalBuffer.limit(internalBuffer.position());
+      internalBuffer.reset();
+   }
+
+   public void receiveData(HornetQBuffer buffer)
+   {
+      int newSize = buffer.readableBytes();
+      byte[] newData = new byte[newSize];
+      buffer.readBytes(newData);
+      this.receiveData(newData);
+   }
+
+   //invoke after each successful unmarshall
+   public void mark()
+   {
+      this.internalBuffer.mark();
+   }
+
+   @Override
+   public void readFully(byte[] b) throws IOException
+   {
+      readFully(b, 0, b.length);
+   }
+
+   private void checkSize(int n) throws NotEnoughBytesException
+   {
+      if (internalBuffer.remaining() < n)
+      {
+         throw exception;
+      }
+   }
+
+   @Override
+   public void readFully(byte[] b, int off, int len) throws IOException
+   {
+      checkSize(len);
+      internalBuffer.get(b, off, len);
+   }
+
+   @Override
+   public int skipBytes(int n) throws IOException
+   {
+      checkSize(n);
+      int pos = internalBuffer.position();
+      internalBuffer.position(pos + n);
+      return n;
+   }
+
+   @Override
+   public boolean readBoolean() throws IOException
+   {
+      checkSize(1);
+      byte b = internalBuffer.get();
+      return b != 0;
+   }
+
+   @Override
+   public byte readByte() throws IOException
+   {
+      checkSize(1);
+      return this.internalBuffer.get();
+   }
+
+   @Override
+   public int readUnsignedByte() throws IOException
+   {
+      checkSize(1);
+      return 0xFF & this.internalBuffer.get();
+   }
+
+   @Override
+   public short readShort() throws IOException
+   {
+      checkSize(2);
+      return this.internalBuffer.getShort();
+   }
+
+   @Override
+   public int readUnsignedShort() throws IOException
+   {
+      checkSize(2);
+      return 0xFFFF & this.internalBuffer.getShort();
+   }
+
+   @Override
+   public char readChar() throws IOException
+   {
+      checkSize(2);
+      return this.internalBuffer.getChar();
+   }
+
+   @Override
+   public int readInt() throws IOException
+   {
+      checkSize(4);
+      return this.internalBuffer.getInt();
+   }
+
+   @Override
+   public long readLong() throws IOException
+   {
+      checkSize(8);
+      return this.internalBuffer.getLong();
+   }
+
+   @Override
+   public float readFloat() throws IOException
+   {
+      checkSize(4);
+      return this.internalBuffer.getFloat();
+   }
+
+   @Override
+   public double readDouble() throws IOException
+   {
+      checkSize(8);
+      return this.internalBuffer.getDouble();
+   }
+
+   @Override
+   public String readLine() throws IOException
+   {
+      StringBuilder sb = new StringBuilder("");
+      char c = this.readChar();
+      while (c != '\n')
+      {
+         sb.append(c);
+         c = this.readChar();
+      }
+      return sb.toString();
+   }
+
+   @Override
+   public String readUTF() throws IOException
+   {
+      StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer();
+
+      final int size = this.readUnsignedShort();
+
+      if (size > buffer.byteBuffer.length)
+      {
+         buffer.resizeByteBuffer(size);
+      }
+
+      if (size > buffer.charBuffer.length)
+      {
+         buffer.resizeCharBuffer(size);
+      }
+
+      int count = 0;
+      int byte1, byte2, byte3;
+      int charCount = 0;
+
+      this.readFully(buffer.byteBuffer, 0, size);
+
+      while (count < size)
+      {
+         byte1 = buffer.byteBuffer[count++];
+
+         if (byte1 > 0 && byte1 <= 0x7F)
+         {
+            buffer.charBuffer[charCount++] = (char)byte1;
+         }
+         else
+         {
+            int c = byte1 & 0xff;
+            switch (c >> 4)
+            {
+               case 0xc:
+               case 0xd:
+                  byte2 = buffer.byteBuffer[count++];
+                  buffer.charBuffer[charCount++] = (char)((c & 0x1F) << 6 | byte2
& 0x3F);
+                  break;
+               case 0xe:
+                  byte2 = buffer.byteBuffer[count++];
+                  byte3 = buffer.byteBuffer[count++];
+                  buffer.charBuffer[charCount++] = (char)((c & 0x0F) << 12 | (byte2
& 0x3F) << 6 | (byte3 & 0x3F) << 0);
+                  break;
+               default:
+                  throw new InternalError("unhandled utf8 byte " + c);
+            }
+         }
+      }
+
+      return new String(buffer.charBuffer, 0, charCount);
+   }
+
+   public boolean readable()
+   {
+      return this.internalBuffer.hasRemaining();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/NotEnoughBytesException.java
----------------------------------------------------------------------
diff --git a/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/NotEnoughBytesException.java
b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/NotEnoughBytesException.java
new file mode 100644
index 0000000..ce423b0
--- /dev/null
+++ b/activemq6-protocols/activemq6-openwire-protocol/src/main/java/org/apache/activemq6/core/protocol/openwire/NotEnoughBytesException.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.activemq6.core.protocol.openwire;
+
+import java.io.IOException;
+
+public class NotEnoughBytesException extends IOException
+{
+   private static final long serialVersionUID = 3752739907942923658L;
+
+}


Mime
View raw message