activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1352265 [1/2] - in /activemq/activemq-apollo/trunk: ./ apollo-amqp/ apollo-amqp/src/ apollo-amqp/src/main/ apollo-amqp/src/main/resources/ apollo-amqp/src/main/resources/META-INF/ apollo-amqp/src/main/resources/META-INF/services/ apollo-am...
Date Wed, 20 Jun 2012 18:57:03 GMT
Author: chirino
Date: Wed Jun 20 18:57:01 2012
New Revision: 1352265

URL: http://svn.apache.org/viewvc?rev=1352265&view=rev
Log:
Initial spike of an AMQP 1.0 protocol implementation still a work in progress.

Added:
    activemq/activemq-apollo/trunk/apollo-amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/jaxb.index
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/Module.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/package-info.java
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/package.html
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties   (with props)
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-secure.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl-secure.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks   (with props)
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks   (with props)
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/log4j.properties
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/login.config
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/org/apache/activemq/apollo/amqp/dto/simple.xml
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/users.properties
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/AmqpTest.scala
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/
    activemq/activemq-apollo/trunk/apollo-amqp/src/test/scala/org/apache/activemq/apollo/amqp/dto/XmlCodecTest.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/pom.xml

Added: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.activemq</groupId>
+    <artifactId>apollo-scala</artifactId>
+    <version>99-trunk-SNAPSHOT</version>
+    <relativePath>../apollo-scala</relativePath>
+  </parent>
+
+  <groupId>org.apache.activemq</groupId>
+  <artifactId>apollo-amqp</artifactId>
+  <version>99-trunk-SNAPSHOT</version>
+  <!-- <packaging>bundle</packaging> -->
+
+  <name>${project.artifactId}</name>
+  <description>AMQP messaging protocol</description>
+  
+  <properties>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.fusesource.fuse-extra</groupId>
+      <artifactId>fusesource-amqp</artifactId>
+      <version>99-master-SNAPSHOT</version>
+    </dependency>
+
+    <!-- Scala Support -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>compile</scope>
+      <version>${scala-version}</version>
+    </dependency>
+
+    <!-- Testing Dependencies -->
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-broker</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-util</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-bdb</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-leveldb</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all-server</artifactId>
+      <version>${jetty-version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>apollo-web</artifactId>
+      <version>99-trunk-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+      <version>${junit-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_2.9.1</artifactId>
+      <version>${scalatest-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-java</artifactId>
+      <version>2.21.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-chrome-driver</artifactId>
+      <version>2.21.0</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.seleniumhq.selenium</groupId>
+      <artifactId>selenium-firefox-driver</artifactId>
+      <version>2.21.0</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+
+      <!-- <plugin>
+        <groupId>org.fusesource.mvnplugins</groupId>
+        <artifactId>maven-fab-plugin</artifactId>
+        <version>1.26</version>
+        <configuration>
+          <descriptor>
+            <Name>amqp</Name>
+            <Long-Description></Long-Description>
+            <Extends>${project.groupId}:apollo-cli:${project.version}</Extends>
+          </descriptor>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin> -->
+
+      <!-- Generate a test jar for the test cases in this package -->
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.fusesource.scalate</groupId>
+        <artifactId>maven-scalate-plugin</artifactId>
+        <version>${scalate-version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>precompile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <!-- profile which is activated is the swiftmq-client-home prop is defined.
+           Tt tests the amqp broker impl using the swiftmq client libs -->
+      <id>swiftmq-client</id>
+      <activation>
+        <property><name>swiftmq-client-home</name></property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>com.swiftmq</groupId>
+          <artifactId>swiftmq</artifactId>
+          <version>9.2.0</version>
+          <scope>system</scope>
+          <systemPath>${swiftmq-client-home}/jars/swiftmq.jar</systemPath>
+        </dependency>
+        <dependency>
+          <groupId>com.swiftmq</groupId>
+          <artifactId>swiftmq-amqp</artifactId>
+          <version>9.2.0</version>
+          <scope>system</scope>
+          <systemPath>${swiftmq-client-home}/jars/amqp.jar</systemPath>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.7</version>
+            <executions>
+              <execution>
+                <id>add-test-source</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${basedir}/src/test-swiftmq/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/dto-module.index Wed Jun 20 18:57:01 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.amqp.dto.Module
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-codec-factory.index Wed Jun 20 18:57:01 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.amqp.AmqpProtocolCodecFactory
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/META-INF/services/org.apache.activemq.apollo/protocol-factory.index Wed Jun 20 18:57:01 2012
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.amqp.AmqpProtocolFactory
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/jaxb.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/jaxb.index?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/jaxb.index (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/resources/org/apache/activemq/apollo/amqp/dto/jaxb.index Wed Jun 20 18:57:01 2012
@@ -0,0 +1,18 @@
+# ------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ------------------------------------------------------------------------
+AmqpDTO
+AmqpConnectionStatusDTO

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpCodec.scala Wed Jun 20 18:57:01 2012
@@ -0,0 +1,50 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+
+import _root_.org.fusesource.hawtbuf._
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker.store.MessageRecord
+import org.apache.activemq.apollo.broker.Message
+
+object AmqpCodec extends Log {
+
+  val PROTOCOL = "amqp"
+  val PROTOCOL_ID = Buffer.ascii(PROTOCOL)
+  val PROTOCOL_MAGIC = new Buffer(Array[Byte]('A', 'M', 'Q', 'P'))
+
+  val EMPTY_BUFFER = new Buffer(0)
+  var max_command_length = 20
+
+
+  def encode(message: Message):MessageRecord = {
+    message match {
+      case message:AMQPMessage =>
+        val rc = new MessageRecord
+        rc.protocol = PROTOCOL_ID
+        rc.buffer = message.payload
+        rc
+      case _ => throw new RuntimeException("Invalid message type");
+    }
+  }
+
+  def decode(message: MessageRecord) = {
+    AMQPMessage(message.buffer)
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocol.scala Wed Jun 20 18:57:01 2012
@@ -0,0 +1,77 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+import _root_.org.fusesource.hawtbuf._
+import org.apache.activemq.apollo.broker._
+import java.lang.String
+import protocol.{ProtocolCodecFactory, ProtocolFactory, Protocol}
+import org.apache.activemq.apollo.broker.store._
+import AmqpCodec._
+import org.fusesource.amqp.codec.AMQPProtocolCodec
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+/**
+ * Creates AmqpCodec objects that encode/decode the
+ * <a href="http://activemq.apache.org/amqp/">Amqp</a> protocol.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class AmqpProtocolCodecFactory extends ProtocolCodecFactory.Provider {
+  def id = PROTOCOL
+
+  def createProtocolCodec() = new AMQPProtocolCodec();
+
+  def isIdentifiable() = true
+
+  def maxIdentificaionLength() = PROTOCOL_MAGIC.length;
+
+  def matchesIdentification(header: Buffer):Boolean = {
+    if (header.length < PROTOCOL_MAGIC.length) {
+      false
+    } else {
+      header.startsWith(PROTOCOL_MAGIC)
+    }
+  }
+}
+
+class AmqpProtocolFactory extends ProtocolFactory {
+
+  def create() = AmqpProtocol
+
+  def create(config: String) = if(config == PROTOCOL) {
+    AmqpProtocol
+  } else {
+    null
+  }
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object AmqpProtocol extends AmqpProtocolCodecFactory with Protocol {
+
+  def createProtocolHandler = new AmqpProtocolHandler
+  def encode(message: Message) = AmqpCodec.encode(message)
+  def decode(message: MessageRecord) = AmqpCodec.decode(message)
+
+}
+
+

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Wed Jun 20 18:57:01 2012
@@ -0,0 +1,699 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp
+
+import java.util.concurrent.TimeUnit
+import java.util.Date
+import scala.collection.mutable.{ListBuffer, HashMap}
+
+import org.fusesource.hawtdispatch._
+import org.fusesource.hawtbuf._
+import org.fusesource.hawtbuf.Buffer._
+
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.util.path.{PathParser, Path, LiteralPart}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.broker.protocol.ProtocolHandler
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import org.apache.activemq.apollo.amqp.dto._
+
+import org.fusesource.amqp._
+import org.fusesource.amqp.Callback
+import org.fusesource.amqp.codec.api.AnnotatedMessage
+import org.fusesource.amqp.codec.marshaller.MessageSupport
+import org.fusesource.amqp.codec.types._
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object AMQPMessage {
+
+  def apply(annotated:AnnotatedMessage):AMQPMessage = {
+    val payload = MessageSupport.toBuffer(annotated)
+    val rc = AMQPMessage(payload)
+    rc._annotated = annotated
+    rc
+  }
+}
+
+case class AMQPMessage(payload:Buffer) extends Message {
+  import AmqpProtocolHandler._
+  def protocol = AmqpProtocol
+
+  var _annotated:AnnotatedMessage = _
+  def annotated = {
+    if ( _annotated ==null ) {
+      _annotated = MessageSupport.decodeAnnotatedMessage(payload)
+    }
+    _annotated
+  }
+  
+  def getBodyAs[T](toType : Class[T]): T = {
+    if (toType == classOf[Buffer]) {
+      payload
+    } else if( toType == classOf[String] ) {
+      payload.utf8
+    } else if (toType == classOf[AsciiBuffer]) {
+      payload.ascii
+    } else if (toType == classOf[UTF8Buffer]) {
+      payload.utf8
+    } else {
+      null
+    }
+  }.asInstanceOf[T]
+
+  def getLocalConnectionId: AnyRef = annotated.getDeliveryAnnotations.getValue.get(SENDER_CONTAINER_KEY) match {
+    case x:AMQPString => x.getValue
+    case _ => null
+  }
+
+  def getProperty(name: String)= annotated match {
+    case null => null
+    case annotated =>
+      annotated.getMessage.getApplicationProperties match {
+        case null => null
+        case props =>
+          props.getValue.get(new AMQPString(name)).asInstanceOf[Object]
+      }
+  }
+
+  def release() {}
+  def retain() {}
+  def retained(): Int = 0
+}
+
+
+object AmqpProtocolHandler extends Log {
+  val SENDER_CONTAINER_KEY = new AMQPString("sender-container")
+
+  // How long we hold a failed connection open so that the remote end
+  // can get the resulting error message.
+  val DEFAULT_DIE_DELAY = 5*1000L
+  val WAITING_ON_CLIENT_REQUEST = ()=> "client request"
+
+  val DEFAULT_DETINATION_PARSER = new DestinationParser
+  DEFAULT_DETINATION_PARSER.queue_prefix = "/queue/"
+  DEFAULT_DETINATION_PARSER.topic_prefix = "/topic/"
+  DEFAULT_DETINATION_PARSER.dsub_prefix = "/dsub/"
+  DEFAULT_DETINATION_PARSER.temp_queue_prefix = "/temp-queue/"
+  DEFAULT_DETINATION_PARSER.temp_topic_prefix = "/temp-topic/"
+  DEFAULT_DETINATION_PARSER.destination_separator = ","
+  DEFAULT_DETINATION_PARSER.path_separator = "."
+  DEFAULT_DETINATION_PARSER.any_child_wildcard = "*"
+  DEFAULT_DETINATION_PARSER.any_descendant_wildcard = "**"
+
+}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class AmqpProtocolHandler extends ProtocolHandler {
+  import AmqpProtocolHandler._
+
+  val security_context = new SecurityContext
+
+  var connection_log:Log = AmqpProtocolHandler
+  var host:VirtualHost = null
+  var waiting_on = WAITING_ON_CLIENT_REQUEST
+  var config:AmqpDTO = _
+  var dead = false
+
+  def session_id = security_context.session_id
+  def protocol = AmqpCodec.PROTOCOL
+  def broker = connection.connector.broker
+  def queue = connection.dispatch_queue
+
+  def die_delay = {
+    OptionSupport(config.die_delay).getOrElse(DEFAULT_DIE_DELAY)
+  }
+
+  lazy val buffer_size = MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+  var amqp_connection:AMQPConnection = _
+  var messages_sent = 0L
+  var messages_received = 0L
+
+  override def create_connection_status = {
+    var rc = new AmqpConnectionStatusDTO
+    rc.protocol_version = "1.0.0"
+    rc.user = security_context.user
+//    rc.subscription_count = consumers.size
+    rc.waiting_on = waiting_on()
+    rc.messages_sent = messages_sent
+    rc.messages_received = messages_received
+    rc
+  }
+
+  class ProtocolException(msg:String) extends RuntimeException(msg)
+  class Break extends RuntimeException
+
+  private def async_die(msg:String, e:Throwable=null) = try {
+    die(msg, e)
+  } catch {
+    case x:Break=>
+  }
+
+  private def die[T](msg:String, e:Throwable=null):T = {
+    if( e!=null) {
+      connection_log.info(e, "AMQP connection '%s' error: %s", security_context.remote_address, msg, e)
+    } else {
+      connection_log.info("AMQP connection '%s' error: %s", security_context.remote_address, msg)
+    }
+    if( !dead ) {
+      dead = true
+      waiting_on = ()=>"shutdown"
+      connection.transport.resumeRead
+
+      // TODO: if there are too many open connections we should just close the connection
+      // without waiting for the error to get sent to the client.
+      queue.after(die_delay, TimeUnit.MILLISECONDS) {
+        connection.stop(NOOP)
+      }
+    }
+    throw new Break()
+  }
+
+  override def set_connection(connection: BrokerConnection) = {
+    super.set_connection(connection)
+    import collection.JavaConversions._
+
+    val connector_config = connection.connector.config.asInstanceOf[AcceptingConnectorDTO]
+    config = connector_config.protocols.find( _.isInstanceOf[AmqpDTO]).map(_.asInstanceOf[AmqpDTO]).getOrElse(new AmqpDTO)
+
+    val options = new AMQPConnectionOptions
+    options.setServer(true);
+    options.setTransport(connection.transport);
+    options.setMaxFrameSize(1024*4)
+    options.setIdleTimeout(-1);
+    options.setLogger(new AMQPConnectionOptions.Logger {
+      override def _debug(format: String, args: Array[AnyRef]) {
+        println(System.currentTimeMillis()+": "+format.format(args: _*))
+        // connection_log.debug(format, args:_*)
+      }
+      override def _trace(format: String, args: Array[AnyRef]) {
+        println(System.currentTimeMillis()+": "+format.format(args: _*))
+        // connection_log.trace(format, args:_*)
+      }
+    })
+    options.setListener(new AMQPConnection.Listener(){
+
+      override def onBegin(begin: Begin) = new AMQPSessionOptions(100, 100, session_listener)
+
+      override def onAccepted(session: AMQPSession) {
+        connection_log.info("accepted: "+session)
+      }
+      override def onException(error: Throwable) {
+        error.printStackTrace();
+      }
+
+      override def onOpen(request: Open, response: Open, callback: Callback[Open]) {
+        handle_open(request, response, callback)
+      }
+    })
+    amqp_connection = AMQP.open(options, new Callback[AMQPConnection] {
+      override def onSuccess(value: AMQPConnection) {
+        println("AMQP connection is open.")
+      }
+      override def onFailure(value: Throwable) {
+        println("Failed to open AMQP connection: "+value)
+      }
+    })
+
+  }
+  override def on_transport_connected() = sys.error("should not get called")
+  override def on_transport_disconnected() = sys.error("should not get called")
+  override def on_transport_command(command:AnyRef):Unit = sys.error("should not get called")
+
+  def suspend_read(reason: =>String) = {
+    waiting_on = reason _
+    connection.transport.suspendRead
+    // heart_beat_monitor.suspendRead
+  }
+  def resume_read() = {
+    waiting_on = WAITING_ON_CLIENT_REQUEST
+    connection.transport.resumeRead
+    // heart_beat_monitor.resumeRead
+  }
+
+  def handle_open(request: Open, response: Open, callback: Callback[Open]) = {
+    broker.dispatch_queue {
+      suspend_read("host lookup")
+      val host = request.getHostname match {
+        case null => broker.default_virtual_host
+        case host=> broker.get_virtual_host(ascii(host))
+      }
+      queue {
+        resume_read
+        if(host==null) {
+          callback.onFailure(new AMQPException("Invalid virtual host: "+host));
+        } else if(!host.service_state.is_started) {
+          callback.onFailure(new AMQPException("virtual host not ready"));
+        } else {
+          response.setContainerID(host.id)
+          this.host=host
+          callback.onSuccess(response)
+//          security_context.session_id = Some("%s-%x".format(this.host.config.id, this.host.session_counter.incrementAndGet))
+//          connection_log = host.connection_log
+//          if( host.authenticator!=null &&  host.authorizer!=null ) {
+//            suspend_read("authenticating and authorizing connect")
+//            host.authenticator.authenticate(security_context) { auth_failure=>
+//              dispatchQueue {
+//                if( auth_failure!=null ) {
+//                  async_die("%s. Credentials=%s".format(auth_failure, security_context.credential_dump))
+//                } else if( !host.authorizer.can(security_context, "connect", connection.connector) ) {
+//                  async_die("Not authorized to connect to connector '%s'. Principals=%s".format(connection.connector.id, security_context.principal_dump))
+//                } else if( !host.authorizer.can(security_context, "connect", this.host) ) {
+//                  async_die("Not authorized to connect to virtual host '%s'. Principals=%s".format(this.host.id, security_context.principal_dump))
+//                } else {
+//                  resume_read
+//                  send_connected
+//                }
+//              }
+//            }
+//          } else {
+//            send_connected
+//          }
+        }
+      }
+    }
+  }
+
+  var destination_parser = DEFAULT_DETINATION_PARSER
+  var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
+
+  def decode_addresses(value:String):Array[SimpleAddress] = {
+    val rc = destination_parser.decode_multi_destination(value)
+    if( rc==null ) {
+      return null
+    }
+    rc.map { dest =>
+      if( dest.domain.startsWith("temp-") ) {
+        temp_destination_map.getOrElseUpdate(dest, {
+          val parts = LiteralPart("temp") :: LiteralPart(broker.id) :: LiteralPart(session_id.get) :: dest.path.parts
+          SimpleAddress(dest.domain.stripPrefix("temp-"), Path(parts))
+        })
+      } else {
+        dest
+      }
+    }
+  }
+
+  class AmqpProducerRoute(val dest: String, val addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) {
+
+    val key = addresses.toList
+    var is_connected = false
+
+    override def send_buffer_size = buffer_size
+
+    override def connection = Some(AmqpProtocolHandler.this.connection)
+
+    override def connected() = is_connected = true
+
+    override def dispatch_queue = queue
+
+    refiller = ^ {
+      resume_read
+    }
+  }
+
+  object session_listener extends AMQPSession.Listener{
+    override def onAttach(attach: Attach, callback: Callback[AMQPEndpoint]) {
+      if( attach.getRole == Role.SENDER.getValue ) {
+        val target = attach.getTarget.asInstanceOf[Target]
+        target.getAddress match {
+          case address:AMQPString =>
+            var dest = address.getValue
+            decode_addresses(dest) match {
+              case null => callback.onFailure(new Exception("Invaild address: "+dest))
+              case addresses => attach_sender(attach, dest, addresses, callback)
+            }
+          case _ =>
+            callback.onFailure(new Exception("Invaild address: "+target.getAddress))
+        }
+      } else {
+        val source = attach.getSource.asInstanceOf[Source]
+        source.getAddress match {
+          case address:AMQPString =>
+            var dest = address.getValue
+            decode_addresses(dest) match {
+              case null => callback.onFailure(new Exception("Invaild address: "+dest))
+              case addresses => attach_receiver(attach, dest, addresses, callback)
+            }
+          case _ =>
+            callback.onFailure(new Exception("Invaild address: "+source.getAddress))
+        }
+      }
+    }
+
+    override def onClose(error: Error) {
+      if( error!=null ) {
+        info("peer closed the AMQP session due to: "+error)
+      }
+    }
+
+  }
+
+  def attach_sender(attach: Attach, address: String, addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) {
+    val target = new AmqpProducerRoute(address, addresses)
+    var receiver: AMQPReceiver = null
+    // create the producer route...
+    val options = new AMQPReceiverOptions();
+    options.source = attach.getSource.asInstanceOf[Source]
+    options.target = attach.getTarget.asInstanceOf[Target]
+    options.name = attach.getName
+    options.senderSettleMode = SenderSettleMode.valueOf(attach.getSndSettleMode)
+    options.receiverSettleMode = ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
+
+    options.maxMessageSize = 10 * 1024 * 1024;
+
+    def pump = {
+      while (target.is_connected && !target.full && receiver.peek() != null) {
+
+        val amqpDelivery = receiver.poll()
+
+        // Update the message /w who sent it to us..
+        val amqpMessage = amqpDelivery.getMessage;
+        if (amqpMessage.getDeliveryAnnotations == null) {
+          amqpMessage.setDeliveryAnnotations(new DeliveryAnnotations(new MapEntries()))
+        }
+        amqpMessage.getDeliveryAnnotations.getValue.add(SENDER_CONTAINER_KEY, new AMQPString(amqp_connection.remoteContainerId()))
+
+        val apolloDelivery = new Delivery
+        apolloDelivery.message = AMQPMessage(amqpMessage)
+        apolloDelivery.size = amqpDelivery.payload.length()
+        //                delivery.expiration = message.expiration
+        //                delivery.persistent = message.persistent
+        //                delivery.uow = uow
+        //                get(frame.headers, RETAIN).foreach { retain =>
+        //                  delivery.retain = retain match {
+        //                    case SET => RetainSet
+        //                    case REMOVE => RetainRemove
+        //                    case _ => RetainIgnore
+        //                  }
+        //                }
+
+        if (!amqpDelivery.isSettled) {
+          apolloDelivery.ack = {
+            (consumed, uow) =>
+              queue <<| ^ {
+                amqpDelivery.ack()
+              }
+          }
+        }
+
+        target.offer(apolloDelivery)
+      }
+    }
+
+    target.refiller = ^ {
+      pump
+    }
+    options.listener = new AMQPEndpoint.Listener {
+      override def onTransfer() = pump
+
+      override def onClosed(senderClosed: Boolean, error: Error) {
+        if (error != null) {
+          debug("Peer closed link due to error: %s", error)
+        }
+        host.dispatch_queue {
+          host.router.disconnect(target.addresses, target)
+        }
+      }
+    }
+
+    // start with 0 credit window so that we don't receive any messages
+    // until we have verified if that we can connect to the destination..
+    options.credit = 0
+    receiver = AMQP.createReceiver(options)
+    callback.onSuccess(receiver)
+
+    host.dispatch_queue {
+      val rc = host.router.connect(target.addresses, target, security_context)
+      queue {
+        rc match {
+          case Some(failure) =>
+            receiver.detach(true, failure, null)
+          case None =>
+            // Add credit to start receiving messages.
+            receiver.addCredit(50)
+        }
+      }
+    }
+  }
+  var protocol_convert = "full"
+
+  class AMQPConsumer(
+      val subscription_id:String,
+      val addresses:Array[_ <: BindAddress],
+      val selector:(String, BooleanExpression),
+      override val browser:Boolean,
+      override val exclusive:Boolean,
+      val include_seq:Option[Long],
+      val from_seq:Long,
+      override val close_on_drain:Boolean
+    ) extends BaseRetained with DeliveryConsumer {
+
+    var sender:AMQPSender = _
+
+    override def toString = "amqp subscription:"+subscription_id+", remote address: "+security_context.remote_address
+
+    ///////////////////////////////////////////////////////////////////
+    // DeliveryConsumer Interface..
+    ///////////////////////////////////////////////////////////////////
+    def connect(p:DeliveryProducer) = new AMQPConsumerSession(p)
+    def dispatch_queue = queue
+    override def connection = Option(AmqpProtocolHandler.this.connection)
+    def is_persistent = false
+    def matches(message: Delivery) = true
+
+    override def start_from_tail = from_seq == -1
+    override def jms_selector = if(selector!=null){ selector._1 } else { null }
+    override def user = security_context.user
+
+    var starting_seq:Long = 0L
+    override def set_starting_seq(seq: Long):Unit = {
+      starting_seq=seq
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Sink[(Session[Delivery], Delivery)] interface..
+    ///////////////////////////////////////////////////////////////////
+    object sink extends Sink[(Session[Delivery], Delivery)] {
+      var refiller: Task = NOOP
+      def full: Boolean = sender.full()
+      def offer(value: (Session[Delivery], Delivery)): Boolean = {
+        if( full ) {
+          return false
+        }
+        
+        val (session, delivery) = value
+        val message = delivery.message
+
+        
+        val header = new Header()
+        header.setDeliveryCount(delivery.redeliveries)
+        header.setDurable(delivery.persistent)
+        header.setFirstAcquirer(delivery.redeliveries == 0)
+
+//        if( include_seq.isDefined ) {
+//          frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+//        }
+
+        var annotated = if( message.protocol eq AmqpProtocol ) {
+          val original = message.asInstanceOf[AMQPMessage].annotated
+          var annotated = new AnnotatedMessageImpl
+          annotated.setHeader(header)
+          annotated.setDeliveryAnnotations(original.getDeliveryAnnotations)
+          annotated.setMessageAnnotations(original.getMessageAnnotations)
+          annotated.setMessage(original.getMessage)
+          annotated.setFooter(original.getFooter)
+          annotated
+        } else {
+          
+          val (body, content_type) =  protocol_convert match{
+            case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
+            case _ => (message.encoded, "protocol/"+message.protocol.id())
+          }
+          
+          val bare = new ValueMessageImpl(new AMQPBinary(body))
+          var properties = new Properties()
+          properties.setContentType(ascii(content_type))
+          if( delivery.expiration!= 0 ) {
+            properties.setAbsoluteExpiryTime(new Date(delivery.expiration))
+          }
+          bare.setProperties(properties)
+          var annotated = new AnnotatedMessageImpl
+          annotated.setHeader(header)
+          annotated.setMessage(bare)
+          annotated
+        }
+        
+        sender.send(annotated, null)
+
+        messages_sent += 1
+        return true
+      }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // AMQPEndpoint.Listener interface..
+    ///////////////////////////////////////////////////////////////////
+    object endpoint_listener extends AMQPEndpoint.Listener {
+      override def onTransfer = {
+        sink.refiller.run()
+      }
+      override def onClosed(senderClosed: Boolean, error: Error) {
+        if (error != null) {
+          debug("Peer closed link due to error: %s", error)
+        }
+        host.dispatch_queue {
+          host.router.unbind(addresses, AMQPConsumer.this, false, security_context)
+        }
+      }
+    }
+
+    val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery) {
+      override def time_stamp = broker.now
+    }
+
+    class AMQPConsumerSession(p:DeliveryProducer) extends DeliverySession with SessionSinkFilter[Delivery] {
+
+      def producer = p
+      def consumer = AMQPConsumer.this
+      val downstream = session_manager.open(producer.dispatch_queue, 1, buffer_size)
+
+      // Delegate all the flow control stuff to the session
+      override def full = {
+        val rc = super.full
+        rc
+      }
+
+      def offer(delivery:Delivery) = {
+        if( full ) {
+          false
+        } else {
+          delivery.message.retain()
+          val rc = downstream.offer(delivery)
+          assert(rc, "offer should be accepted since it was not full")
+          true
+        }
+      }
+
+      def close {}
+    }
+  }
+
+  def attach_receiver(attach: Attach, address: String, requested_addresses:Array[SimpleAddress], callback: Callback[AMQPEndpoint]) = try {
+    val options = new AMQPSenderOptions();
+    options.source = attach.getSource.asInstanceOf[Source]
+    options.source.setDefaultOutcome(new Released())
+
+    if (attach.getSndSettleMode == SenderSettleMode.SETTLED.getValue) {
+      // if we are settling... then no other outcomes are possible..
+      options.source.setOutcomes(Array())
+    } else {
+      options.source.setOutcomes(Array(
+        new AMQPSymbol(Accepted.SYMBOLIC_ID),
+        new AMQPSymbol(Rejected.SYMBOLIC_ID),
+        new AMQPSymbol(Released.SYMBOLIC_ID),
+        new AMQPSymbol(Modified.SYMBOLIC_ID)
+      ))
+    }
+    options.target = attach.getTarget.asInstanceOf[Target]
+    options.name = attach.getName
+    options.senderSettleMode = SenderSettleMode.valueOf(attach.getSndSettleMode)
+    options.receiverSettleMode = ReceiverSettleMode.valueOf(attach.getRcvSettleMode)
+    options.maxMessageSize = 10 * 1024 * 1024;
+
+
+    val subscription_id = attach.getName
+    var persistent = false
+    var browser = false
+    var browser_end = browser && true
+    var exclusive = !browser && false
+    var include_seq: Option[Long] = None
+    val from_seq_opt: Option[Long] = None
+
+    def is_multi_destination = if (requested_addresses.length > 1) {
+      true
+    } else {
+      PathParser.containsWildCards(requested_addresses(0).path)
+    }
+    if (from_seq_opt.isDefined && is_multi_destination) {
+      die("The from-seq header is only supported when you subscribe to one destination");
+    }
+
+    val selector = attach.getProperties match {
+      case null => null
+      case properties =>
+        properties.get(new AMQPString("selector")) match {
+          case null => null
+          case x:AMQPString =>
+            try {
+              (x.getValue, SelectorParser.parse(x.getValue))
+            } catch {
+              case e: FilterException =>
+                die("Invalid selector expression '%s': ".format(x, e.getMessage))
+            }
+          case x =>
+            die("Invalid selector expression '%s'".format(x))
+        }
+    }
+
+    val addresses:Array[_ <: BindAddress] = if (persistent) {
+      val dsubs = ListBuffer[BindAddress]()
+      val topics = ListBuffer[BindAddress]()
+      requested_addresses.foreach { address =>
+          address.domain match {
+            case "dsub" => dsubs += address
+            case "topic" => topics += address
+            case _ => die("A persistent subscription can only be used on a topic destination")
+          }
+      }
+      val s = if (selector == null) null else selector._1
+      dsubs += SubscriptionAddress(destination_parser.decode_path(subscription_id), s, topics.toArray)
+      dsubs.toArray
+    } else {
+      requested_addresses
+    }
+
+    val from_seq = from_seq_opt.getOrElse(0L)
+
+    val source = new AMQPConsumer(subscription_id, addresses, selector, browser, exclusive, include_seq, from_seq, browser_end);
+    options.listener = source.endpoint_listener
+    source.sender = AMQP.createSender(options)
+
+    host.dispatch_queue {
+      val rc = host.router.bind(addresses, source, security_context)
+      source.release
+      queue {
+        rc match {
+          case Some(failure) =>
+            source.sender.detach(true, failure, null)
+          case None =>
+        }
+      }
+    }
+    callback.onSuccess(source.sender)
+  } catch {
+    case e => callback.onFailure(e)
+  }
+
+}
+

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.java?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.java Wed Jun 20 18:57:01 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.dto;
+
+import org.apache.activemq.apollo.dto.ConnectionStatusDTO;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="amqp_connection_status")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AmqpConnectionStatusDTO extends ConnectionStatusDTO {
+
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/AmqpDTO.java Wed Jun 20 18:57:01 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.dto;
+
+import org.apache.activemq.apollo.dto.AddUserHeaderDTO;
+import org.apache.activemq.apollo.dto.ProtocolDTO;
+import org.apache.activemq.apollo.dto.ProtocolFilterDTO;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import javax.xml.bind.annotation.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Allow you to customize the amqp protocol implementation.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+@XmlRootElement(name="amqp")
+@XmlAccessorType(XmlAccessType.FIELD)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AmqpDTO extends ProtocolDTO {
+
+    @XmlAttribute(name="add_user_header")
+    public String add_user_header;
+
+    /**
+     * A broker accepts connections via it's configured connectors.
+     */
+    @XmlElement(name="add_user_header")
+    public List<AddUserHeaderDTO> add_user_headers = new ArrayList<AddUserHeaderDTO>();
+
+    /**
+     * If set, it will add the configured header name with the value
+     * set the a timestamp of when the message is received.
+     */
+    @XmlAttribute(name="add_timestamp_header")
+    public String add_timestamp_header;
+
+    /**
+     * If set, the configured header will be added to message
+     * sent to consumer if the message is a redelivery.  It will be
+     * set to the number of re-deliveries that have occurred.
+     */
+    @XmlAttribute(name="add_redeliveries_header")
+    public String add_redeliveries_header;
+
+    @XmlAttribute(name="max_header_length")
+    public String max_header_length;
+
+    @XmlAttribute(name="max_headers")
+    public Integer max_headers;
+
+    @XmlAttribute(name="max_data_length")
+    public String max_data_length;
+
+    @XmlElementRef
+    public List<ProtocolFilterDTO> protocol_filters = new ArrayList<ProtocolFilterDTO>();
+
+    @XmlAttribute(name="queue_prefix")
+    public String queue_prefix;
+
+    @XmlAttribute(name="topic_prefix")
+    public String topic_prefix;
+
+    @XmlAttribute(name="temp_queue_prefix")
+    public String temp_queue_prefix;
+
+    @XmlAttribute(name="temp_topic_prefix")
+    public String temp_topic_prefix;
+
+    @XmlAttribute(name="destination_separator")
+    public String destination_separator;
+
+    @XmlAttribute(name="path_separator")
+    public String path_separator;
+
+    @XmlAttribute(name="any_child_wildcard")
+    public String any_child_wildcard;
+
+    @XmlAttribute(name="any_descendant_wildcard")
+    public String any_descendant_wildcard;
+
+    @XmlAttribute(name="regex_wildcard_start")
+    public String regex_wildcard_start;
+
+    @XmlAttribute(name="regex_wildcard_end")
+    public String regex_wildcard_end;
+
+    @XmlAttribute(name="die_delay")
+    public Long die_delay;
+
+    @XmlAttribute(name="buffer_size")
+    public String buffer_size;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof AmqpDTO)) return false;
+        if (!super.equals(o)) return false;
+
+        AmqpDTO amqpDTO = (AmqpDTO) o;
+
+        if (add_redeliveries_header != null ? !add_redeliveries_header.equals(amqpDTO.add_redeliveries_header) : amqpDTO.add_redeliveries_header != null)
+            return false;
+        if (add_timestamp_header != null ? !add_timestamp_header.equals(amqpDTO.add_timestamp_header) : amqpDTO.add_timestamp_header != null)
+            return false;
+        if (add_user_header != null ? !add_user_header.equals(amqpDTO.add_user_header) : amqpDTO.add_user_header != null)
+            return false;
+        if (add_user_headers != null ? !add_user_headers.equals(amqpDTO.add_user_headers) : amqpDTO.add_user_headers != null)
+            return false;
+        if (any_child_wildcard != null ? !any_child_wildcard.equals(amqpDTO.any_child_wildcard) : amqpDTO.any_child_wildcard != null)
+            return false;
+        if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(amqpDTO.any_descendant_wildcard) : amqpDTO.any_descendant_wildcard != null)
+            return false;
+        if (buffer_size != null ? !buffer_size.equals(amqpDTO.buffer_size) : amqpDTO.buffer_size != null)
+            return false;
+        if (destination_separator != null ? !destination_separator.equals(amqpDTO.destination_separator) : amqpDTO.destination_separator != null)
+            return false;
+        if (die_delay != null ? !die_delay.equals(amqpDTO.die_delay) : amqpDTO.die_delay != null) return false;
+        if (max_data_length != null ? !max_data_length.equals(amqpDTO.max_data_length) : amqpDTO.max_data_length != null)
+            return false;
+        if (max_header_length != null ? !max_header_length.equals(amqpDTO.max_header_length) : amqpDTO.max_header_length != null)
+            return false;
+        if (max_headers != null ? !max_headers.equals(amqpDTO.max_headers) : amqpDTO.max_headers != null)
+            return false;
+        if (path_separator != null ? !path_separator.equals(amqpDTO.path_separator) : amqpDTO.path_separator != null)
+            return false;
+        if (protocol_filters != null ? !protocol_filters.equals(amqpDTO.protocol_filters) : amqpDTO.protocol_filters != null)
+            return false;
+        if (queue_prefix != null ? !queue_prefix.equals(amqpDTO.queue_prefix) : amqpDTO.queue_prefix != null)
+            return false;
+        if (regex_wildcard_end != null ? !regex_wildcard_end.equals(amqpDTO.regex_wildcard_end) : amqpDTO.regex_wildcard_end != null)
+            return false;
+        if (regex_wildcard_start != null ? !regex_wildcard_start.equals(amqpDTO.regex_wildcard_start) : amqpDTO.regex_wildcard_start != null)
+            return false;
+        if (temp_queue_prefix != null ? !temp_queue_prefix.equals(amqpDTO.temp_queue_prefix) : amqpDTO.temp_queue_prefix != null)
+            return false;
+        if (temp_topic_prefix != null ? !temp_topic_prefix.equals(amqpDTO.temp_topic_prefix) : amqpDTO.temp_topic_prefix != null)
+            return false;
+        if (topic_prefix != null ? !topic_prefix.equals(amqpDTO.topic_prefix) : amqpDTO.topic_prefix != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (add_user_header != null ? add_user_header.hashCode() : 0);
+        result = 31 * result + (add_user_headers != null ? add_user_headers.hashCode() : 0);
+        result = 31 * result + (add_timestamp_header != null ? add_timestamp_header.hashCode() : 0);
+        result = 31 * result + (add_redeliveries_header != null ? add_redeliveries_header.hashCode() : 0);
+        result = 31 * result + (max_header_length != null ? max_header_length.hashCode() : 0);
+        result = 31 * result + (max_headers != null ? max_headers.hashCode() : 0);
+        result = 31 * result + (max_data_length != null ? max_data_length.hashCode() : 0);
+        result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
+        result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
+        result = 31 * result + (topic_prefix != null ? topic_prefix.hashCode() : 0);
+        result = 31 * result + (temp_queue_prefix != null ? temp_queue_prefix.hashCode() : 0);
+        result = 31 * result + (temp_topic_prefix != null ? temp_topic_prefix.hashCode() : 0);
+        result = 31 * result + (destination_separator != null ? destination_separator.hashCode() : 0);
+        result = 31 * result + (path_separator != null ? path_separator.hashCode() : 0);
+        result = 31 * result + (any_child_wildcard != null ? any_child_wildcard.hashCode() : 0);
+        result = 31 * result + (any_descendant_wildcard != null ? any_descendant_wildcard.hashCode() : 0);
+        result = 31 * result + (regex_wildcard_start != null ? regex_wildcard_start.hashCode() : 0);
+        result = 31 * result + (regex_wildcard_end != null ? regex_wildcard_end.hashCode() : 0);
+        result = 31 * result + (die_delay != null ? die_delay.hashCode() : 0);
+        result = 31 * result + (buffer_size != null ? buffer_size.hashCode() : 0);
+        return result;
+    }
+}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/Module.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/Module.scala?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/Module.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/Module.scala Wed Jun 20 18:57:01 2012
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.amqp.dto
+import org.apache.activemq.apollo.util.DtoModule
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Module extends DtoModule {
+
+  def dto_package = "org.apache.activemq.apollo.amqp.dto"
+  def extension_classes = Array(classOf[AmqpConnectionStatusDTO], classOf[AmqpDTO])
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/package-info.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/package-info.java?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/package-info.java (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/dto/package-info.java Wed Jun 20 18:57:01 2012
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The JAXB POJOs for the
+ * The JAXB POJOs for the
+ * <a href="http://activemq.apache.org/schema/activemq/apollo/xml-configuration.html">XML Configuration</a>
+ * of the ActiveMQ Broker.
+ */
+@javax.xml.bind.annotation.XmlSchema(
+        namespace = "http://activemq.apache.org/schema/activemq/apollo",
+        elementFormDefault = javax.xml.bind.annotation.XmlNsForm.QUALIFIED)
+package org.apache.activemq.apollo.amqp.dto;
+

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/package.html
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/package.html?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/package.html (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/package.html Wed Jun 20 18:57:01 2012
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+An implementation of the Amqp protocol which is a simple wire 
+protocol for writing clients for ActiveMQ in different
+languages like Ruby, Python, PHP, C etc.
+
+</body>
+</html>

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.jade?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.jade (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/webapp/WEB-INF/org/apache/activemq/apollo/amqp/dto/AmqpConnectionStatusDTO.jade Wed Jun 20 18:57:01 2012
@@ -0,0 +1,51 @@
+-# Licensed to the Apache Software Foundation (ASF) under one or more
+-# contributor license agreements.  See the NOTICE file distributed with
+-# this work for additional information regarding copyright ownership.
+-# The ASF licenses this file to You under the Apache License, Version 2.0
+-# (the "License"); you may not use this file except in compliance with
+-# the License.  You may obtain a copy of the License at
+-#
+-# http://www.apache.org/licenses/LICENSE-2.0
+-#
+-# Unless required by applicable law or agreed to in writing, software
+-# distributed under the License is distributed on an "AS IS" BASIS,
+-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-# See the License for the specific language governing permissions and
+-# limitations under the License.
+
+- import it._
+- val helper = new org.apache.activemq.apollo.web.resources.ViewHelper
+- import helper._
+
+.breadcumbs
+  a(href={strip_resolve("..")}) Back
+
+p state: #{state} #{ uptime(state_since) } ago
+
+- if( state == "STARTED" )
+  form(method="post" action={path("action/delete")})
+    input(type="submit" value="shutdown")
+
+h4 Connection Info
+
+p connector: #{connector}
+p local address: #{local_address}
+p remote address: #{remote_address}
+
+h4 Protocol Info
+
+p protocol: #{protocol}
+p protocol version: #{protocol_version}
+p protocol session id: #{protocol_session_id}
+p user: #{user}
+p subscription count: #{subscription_count}
+p waiting on: #{waiting_on}
+
+h4 Metrics
+
+p messages received from the client: #{messages_received}
+p messages sent to the client: #{messages_sent}
+p bytes read counter: #{memory(read_counter)}
+p bytes written counter: #{memory(write_counter)}
+p last read size: #{memory(last_read_size)}
+p last write size: #{memory(last_write_size)}

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties Wed Jun 20 18:57:01 2012
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console, file
+log4j.logger.org.apache.activemq=TRACE
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=TRACE
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Propchange: activemq/activemq-apollo/trunk/apollo-amqp/src/test/ide-resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-leveldb.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-leveldb.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-leveldb.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+  <notes>Has a LevelDB store enabled.</notes>
+
+  <virtual_host id="default">
+    <host_name>localhost</host_name>
+
+    <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
+    <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
+    <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
+    <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
+
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="mirrored.**" mirrored="true"/>
+
+    <leveldb_store directory="${testdatadir}"/>
+  </virtual_host>
+
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
+  <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-secure.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-secure.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-secure.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+  <authentication domain="AmqpSecurityTest"/>
+  <access_rule allow="connect_group" action="connect"/>
+  <access_rule allow="can_send_create_consume_queue" kind="queue" action="send create consume"/>
+  <access_rule allow="can_send_create_queue" kind="queue" action="send create"/>
+  <access_rule allow="can_send_queue"        kind="queue" action="send"/>
+  <access_rule allow="can_receive_queue"     kind="queue" action="receive"/>
+  <access_rule allow="can_consume_queue"     kind="queue" action="consume"/>
+  <access_rule allow="can_send_create_topic" kind="topic" action="send create"/>
+  <access_rule allow="can_send_topic"        kind="topic" action="send"/>
+  <access_rule allow="can_recieve_topic"     kind="topic" action="receive"/>
+  <access_rule allow="can_consume_create_ds" kind="dsub"  action="consume create"/>
+  <access_rule allow="can_consume_ds"        kind="dsub"  action="consume"/>
+  <access_rule allow="can_recieve_topic"     kind="dsub"  action="receive"/>
+
+  <access_rule allow="guest" action="connect"/>
+  <access_rule allow="guest" action="create destroy send receive consume" kind="topic queue dsub" id_regex="test.*"/>
+
+  <!-- only allow connects over the tcp2 connector -->
+  <access_rule allow="connector_restricted" action="connect" connector="tcp2"/>
+
+
+  <virtual_host id="default">
+    <host_name>localhost</host_name>
+  </virtual_host>
+
+  <connector id="tcp" bind="tcp://0.0.0.0:0">
+    <amqp>
+      <add_user_header kind="org.apache.activemq.jaas.UserPrincipal">JMSXUserID</add_user_header>
+      <add_user_header kind="org.apache.activemq.apollo.broker.security.SourceAddressPrincipal">sender-ip</add_user_header>
+    </amqp>
+  </connector>
+  <connector id="tcp2" bind="tcp://0.0.0.0:0">
+  </connector>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl-secure.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl-secure.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl-secure.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+  <authentication domain="AmqpSslSecurityTest"/>
+  <access_rule allow="connect_group" action="connect"/>
+
+  <virtual_host id="default">
+    <host_name>localhost</host_name>
+  </virtual_host>
+
+  <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password" key_alias="broker-localhost"/>
+  <connector id="ssl" bind="ssl://0.0.0.0:0" />
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp-ssl.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+
+    <notes>The config for the ssl amqp tests.</notes>
+  <virtual_host id="default">
+        <host_name>localhost</host_name>
+    </virtual_host>
+
+    <key_storage file="${basedir}/src/test/resources/apollo.ks" password="password" key_password="password"/>
+    <connector id="ssl" bind="ssl://0.0.0.0:0" />
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp.xml?rev=1352265&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp.xml (added)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo-amqp.xml Wed Jun 20 18:57:01 2012
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
+  <notes>This broker configuration is what the unit tests in this module load up.</notes>
+
+  <virtual_host id="default">
+    <host_name>localhost</host_name>
+    <host_name>127.0.0.1</host_name>
+
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="mirrored.**" mirrored="true"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
+
+  </virtual_host>
+
+  <web_admin bind="http://0.0.0.0:61680"/>
+  <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+
+</broker>
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks?rev=1352265&view=auto
==============================================================================
Files activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks (added) and activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks Wed Jun 20 18:57:01 2012 differ

Propchange: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/apollo.ks
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks?rev=1352265&view=auto
==============================================================================
Files activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks (added) and activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks Wed Jun 20 18:57:01 2012 differ

Propchange: activemq/activemq-apollo/trunk/apollo-amqp/src/test/resources/client.ks
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message