qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oru...@apache.org
Subject [2/2] qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests] Build framework to run JMS client system tests
Date Wed, 10 Jan 2018 13:16:09 GMT
QPID-8074: [JMS AMQP 0-x][System Tests] Build framework to run JMS client system tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/6899893a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/6899893a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/6899893a

Branch: refs/heads/master
Commit: 6899893a3eac396e4810e9ea2bde0dadb296babd
Parents: a2c3f8b
Author: Alex Rudyy <orudyy@apache.org>
Authored: Wed Jan 10 13:03:33 2018 +0000
Committer: Alex Rudyy <orudyy@apache.org>
Committed: Wed Jan 10 13:15:39 2018 +0000

----------------------------------------------------------------------
 pom.xml                                         |  50 +-
 systests/pom.xml                                | 188 ++++
 .../apache/qpid/systest/core/BrokerAdmin.java   |  62 ++
 .../qpid/systest/core/BrokerAdminException.java |  34 +
 .../qpid/systest/core/BrokerAdminFactory.java   |  44 +
 .../systest/core/BrokerAdminUsingTestBase.java  |  39 +
 .../apache/qpid/systest/core/JmsTestBase.java   |  88 ++
 .../qpid/systest/core/QpidTestRunner.java       |  88 ++
 .../core/brokerj/AmqpManagementFacade.java      | 554 ++++++++++++
 .../core/brokerj/SpawnQpidBrokerAdmin.java      | 898 +++++++++++++++++++
 .../qpid/systest/core/dependency/Booter.java    | 103 +++
 .../systest/core/dependency/ClasspathQuery.java | 202 +++++
 .../dependency/ConsoleRepositoryListener.java   | 128 +++
 .../dependency/ConsoleTransferListener.java     | 175 ++++
 .../ManualRepositorySystemFactory.java          |  58 ++
 .../LogbackPropertyValueDiscriminator.java      |  65 ++
 .../logback/LogbackSocketPortNumberDefiner.java |  45 +
 .../qpid/systest/core/util/FileUtils.java       |  58 ++
 .../qpid/systest/core/util/PortHelper.java      | 191 ++++
 .../qpid/systest/core/util/SystemUtils.java     |  35 +
 .../apache/qpid/systest/core/util/Utils.java    | 122 +++
 .../resources/broker-j-config-with-logging.json |  83 ++
 .../core/brokerj/SpawnQpidBrokerAdminTest.java  | 282 ++++++
 systests/src/test/resources/logback-test.xml    |  58 ++
 24 files changed, 3649 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4a33a8a..0bc3c55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,6 +100,8 @@
     <mockito-version>1.9.5</mockito-version>
     <hamcrest-version>1.3</hamcrest-version>
 
+    <maven-core-version>3.5.0</maven-core-version>
+    <maven-resolver-version>1.0.3</maven-resolver-version>
     <exec-maven-plugin-version>1.3.2</exec-maven-plugin-version>
     <javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
     <maven-rar-plugin-version>2.3</maven-rar-plugin-version>
@@ -118,6 +120,7 @@
     <module>client</module>
     <module>client/example</module>
     <module>doc</module>
+    <module>systests</module>
   </modules>
 
   <dependencyManagement>
@@ -162,7 +165,6 @@
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>${junit-version}</version>
-        <scope>test</scope>
       </dependency>
 
       <dependency>
@@ -171,6 +173,52 @@
         <version>${mockito-version}</version>
         <scope>test</scope>
       </dependency>
+
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-core</artifactId>
+        <version>${fasterxml-jackson-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${fasterxml-jackson-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.hamcrest</groupId>
+        <artifactId>hamcrest-library</artifactId>
+        <version>${hamcrest-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.hamcrest</groupId>
+        <artifactId>hamcrest-integration</artifactId>
+        <version>${hamcrest-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven</groupId>
+        <artifactId>maven-core</artifactId>
+        <version>${maven-core-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven.resolver</groupId>
+        <artifactId>maven-resolver-api</artifactId>
+        <version>${maven-resolver-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven.resolver</groupId>
+        <artifactId>maven-resolver-connector-basic</artifactId>
+        <version>${maven-resolver-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven.resolver</groupId>
+        <artifactId>maven-resolver-transport-file</artifactId>
+        <version>${maven-resolver-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.maven.resolver</groupId>
+        <artifactId>maven-resolver-transport-http</artifactId>
+        <version>${maven-resolver-version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/pom.xml
----------------------------------------------------------------------
diff --git a/systests/pom.xml b/systests/pom.xml
new file mode 100644
index 0000000..70bca14
--- /dev/null
+++ b/systests/pom.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-jms-amqp-0-x-parent</artifactId>
+        <version>6.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>qpid-client-systests</artifactId>
+    <name>Apache Qpid JMS AMQP 0-x System Tests</name>
+    <description>Apache Qpid JMS AMQP 0-x System Tests</description>
+
+    <properties>
+        <qpid.amqp.version>0-10</qpid.amqp.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.resolver</groupId>
+            <artifactId>maven-resolver-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.resolver</groupId>
+            <artifactId>maven-resolver-connector-basic</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.resolver</groupId>
+            <artifactId>maven-resolver-transport-file</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.maven.resolver</groupId>
+            <artifactId>maven-resolver-transport-http</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                       <qpid.amqp.version>${qpid.amqp.version}</qpid.amqp.version>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>broker-j</id>
+
+            <properties>
+                <qpid-broker-j-version>7.0.0</qpid-broker-j-version>
+                <qpid.systest.broker_admin>org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin</qpid.systest.broker_admin>
+                <qpid.systest.java8.executable>/usr/bin/java</qpid.systest.java8.executable>
+                <qpid.systest.virtualhost.blueprint>{"type":"BDB","globalAddressDomains":"[]"}</qpid.systest.virtualhost.blueprint>
+                <qpid.systest.brokerj.dependencies>org.apache.qpid:qpid-broker:${qpid-broker-j-version},org.apache.qpid:qpid-broker-core:${qpid-broker-j-version},org.apache.qpid:qpid-bdbstore:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-0-8-protocol:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-0-10-protocol:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-management-amqp:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-access-control:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-derby-store:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-jdbc-provider-bone:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-jdbc-store:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-logging-logback:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-management-amqp:${qpid-broker-j-version},or
 g.apache.qpid:qpid-broker-plugins-memory-store:${qpid-broker-j-version},org.apache.qpid:qpid-bdbstore:${qpid-broker-j-version}</qpid.systest.brokerj.dependencies>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <systemPropertyVariables>
+                                <qpid.systest.brokerj.dependencies>${qpid.systest.brokerj.dependencies}</qpid.systest.brokerj.dependencies>
+                                <qpid.systest.java8.executable>${qpid.systest.java8.executable}</qpid.systest.java8.executable>
+                                <qpid.systest.build.classpath.file>${project.build.directory}/qpid.build.classpath.txt</qpid.systest.build.classpath.file>
+                                <qpid.systest.initialConfigurationLocation>classpath:broker-j-config-with-logging.json</qpid.systest.initialConfigurationLocation>
+                                <qpid.systest.broker_admin>${qpid.systest.broker_admin}</qpid.systest.broker_admin>
+                                <qpid.systest.virtualhostnode.type>JSON</qpid.systest.virtualhostnode.type>
+                                <qpid.systest.virtualhost.blueprint>${qpid.systest.virtualhost.blueprint}</qpid.systest.virtualhost.blueprint>
+                                <qpid.systest.logback.logs_dir>${project.basedir}${file.separator}target${file.separator}surefire-reports</qpid.systest.logback.logs_dir>
+                                <qpid.systest.broker.clean.between.tests>true</qpid.systest.broker.clean.between.tests>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>enforce-java8-check</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireFilesExist>
+                                            <files>
+                                                <file>${qpid.systest.java8.executable}</file>
+                                            </files>
+                                        </requireFilesExist>
+                                    </rules>
+                                    <fail>true</fail>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
new file mode 100644
index 0000000..b2b41cc
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+@SuppressWarnings("unused")
+public interface BrokerAdmin
+{
+    void create(final Class testClass);
+    void start(final Class testClass, final Method method);
+    void stop(final Class testClass, final Method method);
+    void destroy(final Class testClass);
+    ListenableFuture<Void> restart();
+
+    InetSocketAddress getBrokerAddress(PortType portType);
+    boolean supportsPersistence();
+
+    String getValidUsername();
+    String getValidPassword();
+
+    String getType();
+    BrokerType getBrokerType();
+
+    Connection getConnection() throws JMSException;
+
+    enum PortType
+    {
+        ANONYMOUS_AMQP,
+        AMQP
+    }
+
+    enum BrokerType
+    {
+        BROKERJ,
+        CPP
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java
new file mode 100644
index 0000000..5b6a3f5
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.core;
+
+public class BrokerAdminException extends RuntimeException
+{
+    public BrokerAdminException(final String message)
+    {
+        super(message);
+    }
+
+    public BrokerAdminException(final String message, final Throwable cause)
+    {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java
new file mode 100644
index 0000000..5cfdc53
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+class BrokerAdminFactory
+{
+    BrokerAdmin createInstance()
+    {
+        String type = System.getProperty("qpid.systest.broker_admin");
+        if (type != null)
+        {
+            try
+            {
+                @SuppressWarnings("unchecked")
+                Class<BrokerAdmin> c = (Class<BrokerAdmin>) Class.forName(type);
+                return c.newInstance();
+            }
+            catch (InstantiationException | IllegalAccessException | ClassNotFoundException e)
+            {
+                throw new BrokerAdminException(String.format("Could not find BrokerAdmin implementation of type '%s'",
+                                                         type));
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java
new file mode 100644
index 0000000..72858ca
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+import org.junit.runner.RunWith;
+
+@RunWith(QpidTestRunner.class)
+public abstract class BrokerAdminUsingTestBase
+{
+    private BrokerAdmin _brokerAdmin;
+
+    public void init(final BrokerAdmin brokerAdmin)
+    {
+        _brokerAdmin = brokerAdmin;
+    }
+
+    public BrokerAdmin getBrokerAdmin()
+    {
+        return _brokerAdmin;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
new file mode 100644
index 0000000..49cc3b2
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.core;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assume.assumeThat;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+import javax.naming.NamingException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class JmsTestBase extends BrokerAdminUsingTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(JmsTestBase.class);
+
+    @Rule
+    public final TestName _testName = new TestName();
+
+    @Before
+    public void setUpTestBase()
+    {
+        assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
+                                 getClass().getName(),
+                                 _testName.getMethodName()),
+                   getBrokerAdmin(), is(notNullValue()));
+        LOGGER.debug("Test receive timeout is {} milliseconds", getReceiveTimeout());
+    }
+
+
+    protected Connection getConnection() throws JMSException, NamingException
+    {
+        assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s",
+                                 getClass().getName(),
+                                 _testName.getMethodName()),
+                   getBrokerAdmin(), is(notNullValue()));
+
+        return getBrokerAdmin().getConnection();
+    }
+
+    protected static long getReceiveTimeout()
+    {
+        return Long.getLong("qpid.test_receive_timeout", 1000L);
+    }
+
+    protected String getTestName()
+    {
+        return _testName.getMethodName();
+    }
+
+
+    protected TopicConnection getTopicConnection() throws JMSException, NamingException
+    {
+        return (TopicConnection) getConnection();
+    }
+
+    protected QueueConnection getQueueConnection() throws JMSException, NamingException
+    {
+        return (QueueConnection) getConnection();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
new file mode 100644
index 0000000..37bd96c
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+
+public class QpidTestRunner extends BlockJUnit4ClassRunner
+{
+    private final BrokerAdmin _brokerAdmin;
+    private final Class _testClass;
+
+    public QpidTestRunner(final Class<?> klass) throws InitializationError
+    {
+        super(klass);
+        _testClass = klass;
+        _brokerAdmin = new BrokerAdminFactory().createInstance();
+    }
+
+    @Override
+    protected Object createTest() throws Exception
+    {
+        Object test = super.createTest();
+        BrokerAdminUsingTestBase qpidTest = ((BrokerAdminUsingTestBase) test);
+        qpidTest.init(_brokerAdmin);
+        return test;
+    }
+
+    @Override
+    public void run(final RunNotifier notifier)
+    {
+        if (_brokerAdmin != null)
+        {
+            _brokerAdmin.create(_testClass);
+        }
+        try
+        {
+            super.run(notifier);
+        }
+        finally
+        {
+            if (_brokerAdmin != null)
+            {
+                _brokerAdmin.destroy(_testClass);
+            }
+        }
+    }
+
+    @Override
+    protected void runChild(final FrameworkMethod method, final RunNotifier notifier)
+    {
+        if (_brokerAdmin != null)
+        {
+            _brokerAdmin.start(_testClass, method.getMethod());
+        }
+        try
+        {
+            super.runChild(method, notifier);
+        }
+        finally
+        {
+            if (_brokerAdmin != null)
+            {
+                _brokerAdmin.stop(_testClass, method.getMethod());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
new file mode 100644
index 0000000..ed7f116
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core.brokerj;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+class AmqpManagementFacade
+{
+    private static final String AMQP_0_X_REPLY_TO_DESTINATION = "ADDR:!response";
+    private static final String AMQP_0_X_CONSUMER_REPLY_DESTINATION =
+            "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}";
+    private final String _managementAddress;
+
+
+    AmqpManagementFacade()
+    {
+        _managementAddress = "ADDR:$management";
+    }
+
+    @SuppressWarnings("unused")
+    Map<String, Object> createEntityUsingAmqpManagement(final String name,
+                                                        final String type,
+                                                        final Session session)
+            throws JMSException
+    {
+        return createEntityUsingAmqpManagement(name, type, Collections.<String, Object>emptyMap(), session);
+    }
+
+    Map<String, Object> createEntityUsingAmqpManagement(final String name,
+                                                        final String type,
+                                                        Map<String, Object> attributes,
+                                                        final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+
+        MapMessage createMessage = session.createMapMessage();
+        createMessage.setStringProperty("type", type);
+        createMessage.setStringProperty("operation", "CREATE");
+        createMessage.setString("name", name);
+        createMessage.setString("object-path", name);
+        createMessage.setJMSReplyTo(replyToDestination);
+        for (Map.Entry<String, Object> entry : attributes.entrySet())
+        {
+            createMessage.setObject(entry.getKey(), entry.getValue());
+        }
+        producer.send(createMessage);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+        producer.close();
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        try
+        {
+            if (response != null)
+            {
+                int statusCode = response.getIntProperty("statusCode");
+                if (statusCode == 201)
+                {
+                    if (response instanceof MapMessage)
+                    {
+                        MapMessage bodyMap = (MapMessage) response;
+                        Map<String, Object> result = new HashMap<>();
+                        Enumeration keys = bodyMap.getMapNames();
+                        while (keys.hasMoreElements())
+                        {
+                            final String key = String.valueOf(keys.nextElement());
+                            Object value = bodyMap.getObject(key);
+                            result.put(key, value);
+                        }
+                        return result;
+                    }
+                    else if (response instanceof ObjectMessage)
+                    {
+                        Object body = ((ObjectMessage) response).getObject();
+                        if (body instanceof Map)
+                        {
+                            @SuppressWarnings("unchecked")
+                            Map<String, Object> bodyMap = (Map<String, Object>) body;
+                            return new HashMap<>(bodyMap);
+                        }
+                    }
+                }
+                else
+                {
+                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
+                                                             statusCode);
+                }
+            }
+
+            throw new OperationUnsuccessfulException("Cannot get the results from a management create operation", -1);
+        }
+        finally
+        {
+            consumer.close();
+        }
+    }
+
+    void updateEntityUsingAmqpManagement(final String name,
+                                         final String type,
+                                         final Map<String, Object> attributes,
+                                         final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+
+        MapMessage createMessage = session.createMapMessage();
+        createMessage.setStringProperty("type", type);
+        createMessage.setStringProperty("operation", "UPDATE");
+        createMessage.setStringProperty("index", "object-path");
+        createMessage.setStringProperty("key", name);
+        createMessage.setJMSReplyTo(replyToDestination);
+        for (Map.Entry<String, Object> entry : attributes.entrySet())
+        {
+            createMessage.setObject(entry.getKey(), entry.getValue());
+        }
+        producer.send(createMessage);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+        producer.close();
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        try
+        {
+            if (response != null)
+            {
+                int statusCode = response.getIntProperty("statusCode");
+                if (statusCode != 200)
+                {
+
+                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
+                                                             statusCode);
+                }
+            }
+            else
+            {
+                throw new OperationUnsuccessfulException("Cannot get the results from a management update operation",
+                                                         -1);
+            }
+        }
+        finally
+        {
+            consumer.close();
+        }
+    }
+
+    void deleteEntityUsingAmqpManagement(final String name,
+                                         final String type,
+                                         final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+
+        MapMessage createMessage = session.createMapMessage();
+        createMessage.setStringProperty("type", type);
+        createMessage.setStringProperty("operation", "DELETE");
+        createMessage.setStringProperty("index", "object-path");
+        createMessage.setJMSReplyTo(replyToDestination);
+
+        createMessage.setStringProperty("key", name);
+        producer.send(createMessage);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        try
+        {
+            if (response != null)
+            {
+                int statusCode = response.getIntProperty("statusCode");
+                if (statusCode != 200 && statusCode != 204)
+                {
+
+                    throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"),
+                                                             statusCode);
+                }
+            }
+            else
+            {
+                throw new OperationUnsuccessfulException("Cannot get the results from a management delete operation",
+                                                         -1);
+            }
+        }
+        finally
+        {
+            consumer.close();
+        }
+    }
+
+    @SuppressWarnings("unused")
+    Object performOperationUsingAmqpManagement(final String name,
+                                               final String type,
+                                               final String operation,
+                                               final Map<String, Object> arguments,
+                                               final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+
+        MapMessage opMessage = session.createMapMessage();
+        opMessage.setStringProperty("type", type);
+        opMessage.setStringProperty("operation", operation);
+        opMessage.setStringProperty("index", "object-path");
+        opMessage.setJMSReplyTo(replyToDestination);
+
+        opMessage.setStringProperty("key", name);
+        for (Map.Entry<String, Object> argument : arguments.entrySet())
+        {
+            Object value = argument.getValue();
+            if (value.getClass().isPrimitive() || value instanceof String)
+            {
+                opMessage.setObjectProperty(argument.getKey(), value);
+            }
+            else
+            {
+                ObjectMapper objectMapper = new ObjectMapper();
+                String jsonifiedValue;
+                try
+                {
+                    jsonifiedValue = objectMapper.writeValueAsString(value);
+                }
+                catch (JsonProcessingException e)
+                {
+                    throw new IllegalArgumentException(String.format(
+                            "Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
+                            argument.getKey()));
+                }
+                opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
+            }
+        }
+
+        producer.send(opMessage);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        try
+        {
+            int statusCode = response.getIntProperty("statusCode");
+            if (statusCode < 200 || statusCode > 299)
+            {
+                throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
+            }
+            if (response instanceof MapMessage)
+            {
+                MapMessage bodyMap = (MapMessage) response;
+                Map<String, Object> result = new TreeMap<>();
+                Enumeration mapNames = bodyMap.getMapNames();
+                while (mapNames.hasMoreElements())
+                {
+                    String key = (String) mapNames.nextElement();
+                    result.put(key, bodyMap.getObject(key));
+                }
+                return result;
+            }
+            else if (response instanceof ObjectMessage)
+            {
+                return ((ObjectMessage) response).getObject();
+            }
+            else if (response instanceof BytesMessage)
+            {
+                BytesMessage bytesMessage = (BytesMessage) response;
+                if (bytesMessage.getBodyLength() == 0)
+                {
+                    return null;
+                }
+                else
+                {
+                    byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
+                    bytesMessage.readBytes(buf);
+                    return buf;
+                }
+            }
+            throw new IllegalArgumentException(
+                    "Cannot parse the results from a management operation.  JMS response message : " + response);
+        }
+        finally
+        {
+            if (session.getTransacted())
+            {
+                session.commit();
+            }
+            consumer.close();
+        }
+    }
+
+    @SuppressWarnings(value = {"unused", "unchecked"})
+    List<Map<String, Object>> managementQueryObjects(final String type, final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+        MapMessage message = session.createMapMessage();
+        message.setStringProperty("identity", "self");
+        message.setStringProperty("type", "org.amqp.management");
+        message.setStringProperty("operation", "QUERY");
+        message.setStringProperty("entityType", type);
+        message.setString("attributeNames", "[]");
+        message.setJMSReplyTo(replyToDestination);
+
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+        producer.send(message);
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        try
+        {
+            if (response instanceof MapMessage)
+            {
+                MapMessage bodyMap = (MapMessage) response;
+                List<String> attributeNames = (List<String>) bodyMap.getObject("attributeNames");
+                List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.getObject("results");
+                return getResultsAsMaps(attributeNames, attributeValues);
+            }
+            else if (response instanceof ObjectMessage)
+            {
+                Object body = ((ObjectMessage) response).getObject();
+                if (body instanceof Map)
+                {
+                    Map<String, ?> bodyMap = (Map<String, ?>) body;
+                    List<String> attributeNames = (List<String>) bodyMap.get("attributeNames");
+                    List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.get("results");
+                    return getResultsAsMaps(attributeNames, attributeValues);
+                }
+            }
+            throw new IllegalArgumentException("Cannot parse the results from a management query");
+        }
+        finally
+        {
+            consumer.close();
+        }
+    }
+
+    @SuppressWarnings("unused")
+    Map<String, Object> readEntityUsingAmqpManagement(final String name,
+                                                      final String type,
+                                                      final boolean actuals,
+                                                      final Session session)
+            throws JMSException
+    {
+        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
+        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
+
+        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
+
+        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
+
+        MapMessage request = session.createMapMessage();
+        request.setStringProperty("type", type);
+        request.setStringProperty("operation", "READ");
+        request.setString("name", name);
+        request.setString("object-path", name);
+        request.setStringProperty("index", "object-path");
+        request.setStringProperty("key", name);
+        request.setBooleanProperty("actuals", actuals);
+        request.setJMSReplyTo(replyToDestination);
+
+        producer.send(request);
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+
+        Message response = consumer.receive(getManagementResponseTimeout());
+        if (session.getTransacted())
+        {
+            session.commit();
+        }
+        try
+        {
+            if (response instanceof MapMessage)
+            {
+                MapMessage bodyMap = (MapMessage) response;
+                Map<String, Object> data = new HashMap<>();
+                @SuppressWarnings("unchecked")
+                Enumeration<String> keys = bodyMap.getMapNames();
+                while (keys.hasMoreElements())
+                {
+                    String key = keys.nextElement();
+                    data.put(key, bodyMap.getObject(key));
+                }
+                return data;
+            }
+            else if (response instanceof ObjectMessage)
+            {
+                Object body = ((ObjectMessage) response).getObject();
+                if (body instanceof Map)
+                {
+                    @SuppressWarnings("unchecked")
+                    Map<String, ?> bodyMap = (Map<String, ?>) body;
+                    return new HashMap<>(bodyMap);
+                }
+            }
+            throw new IllegalArgumentException("Management read failed : "
+                                               + response.getStringProperty("statusCode")
+                                               + " - "
+                                               + response.getStringProperty("statusDescription"));
+        }
+        finally
+        {
+            consumer.close();
+        }
+    }
+
+    @SuppressWarnings("unused")
+    long getQueueDepth(final Queue destination, final Session session) throws Exception
+    {
+        final String escapedName = getEscapedName(destination);
+        Map<String, Object> arguments =
+                Collections.singletonMap("statistics", (Object) Collections.singletonList("queueDepthMessages"));
+
+        Object statistics = performOperationUsingAmqpManagement(escapedName,
+                                                                "org.apache.qpid.Queue", "getStatistics",
+                                                                arguments, session
+                                                               );
+        @SuppressWarnings("unchecked")
+        Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+        return ((Number) statisticsMap.get("queueDepthMessages")).intValue();
+    }
+
+    @SuppressWarnings("unused")
+    boolean isQueueExist(final Queue destination, final Session session) throws Exception
+    {
+        final String escapedName = getEscapedName(destination);
+        try
+        {
+            performOperationUsingAmqpManagement(escapedName,
+                                                "org.apache.qpid.Queue",
+                                                "READ",
+                                                Collections.<String, Object>emptyMap(),
+                                                session);
+            return true;
+        }
+        catch (AmqpManagementFacade.OperationUnsuccessfulException e)
+        {
+            if (e.getStatusCode() == 404)
+            {
+                return false;
+            }
+            else
+            {
+                throw e;
+            }
+        }
+    }
+
+    private String getEscapedName(final Queue destination) throws JMSException
+    {
+        return destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1");
+    }
+
+    private List<Map<String, Object>> getResultsAsMaps(final List<String> attributeNames,
+                                                       final List<List<Object>> attributeValues)
+    {
+        List<Map<String, Object>> results = new ArrayList<>();
+        for (List<Object> resultObject : attributeValues)
+        {
+            Map<String, Object> result = new HashMap<>();
+            for (int i = 0; i < attributeNames.size(); ++i)
+            {
+                result.put(attributeNames.get(i), resultObject.get(i));
+            }
+            results.add(result);
+        }
+        return results;
+    }
+
+    private int getManagementResponseTimeout()
+    {
+        return Integer.getInteger("qpid.systests.management_response_timeout", 5000);
+    }
+
+    static class OperationUnsuccessfulException extends RuntimeException
+    {
+        private final int _statusCode;
+
+        private OperationUnsuccessfulException(final String message, final int statusCode)
+        {
+            super(message == null ? String.format("Unexpected status code %d", statusCode) : message);
+            _statusCode = statusCode;
+        }
+
+        int getStatusCode()
+        {
+            return _statusCode;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
new file mode 100644
index 0000000..3893288
--- /dev/null
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java
@@ -0,0 +1,898 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systest.core.brokerj;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import ch.qos.logback.classic.LoggerContext;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.ByteStreams;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.BrokerAdminException;
+import org.apache.qpid.systest.core.dependency.ClasspathQuery;
+import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
+import org.apache.qpid.systest.core.util.FileUtils;
+import org.apache.qpid.systest.core.logback.LogbackSocketPortNumberDefiner;
+import org.apache.qpid.systest.core.util.SystemUtils;
+
+public class SpawnQpidBrokerAdmin implements BrokerAdmin
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(SpawnQpidBrokerAdmin.class);
+    private static final String BROKER_LOG_PREFIX = "BROKER";
+    private static final String SYSTEST_PROPERTY_PREFIX = "qpid.systest.";
+    private static final String SYSTEST_PROPERTY_BROKER_READY = "qpid.systest.broker.ready";
+    private static final String SYSTEST_PROPERTY_BROKER_STOPPED = "qpid.systest.broker.stopped";
+    private static final String SYSTEST_PROPERTY_BROKER_LISTENING = "qpid.systest.broker.listening";
+    private static final String SYSTEST_PROPERTY_BROKER_PROCESS = "qpid.systest.broker.process";
+    private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
+    private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";
+
+    static final String SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE = "qpid.systest.virtualhostnode.type";
+    static final String SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT = "qpid.systest.virtualhost.blueprint";
+    static final String SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION = "qpid.systest.initialConfigurationLocation";
+    static final String SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE = "qpid.systest.build.classpath.file";
+    static final String SYSTEST_PROPERTY_JAVA8_EXECUTABLE = "qpid.systest.java8.executable";
+    static final String SYSTEST_PROPERTY_BROKERJ_DEPENDECIES = "qpid.systest.brokerj.dependencies";
+
+    private final static AtomicLong BROKER_INSTANCE_COUNTER = new AtomicLong();
+
+    private volatile List<ListeningPort> _ports;
+    private volatile Process _process;
+    private volatile Integer _pid;
+    private volatile String _currentWorkDirectory;
+    private volatile boolean _isPersistentStore;
+    private volatile String _virtualHostNodeName;
+
+    @Override
+    public void create(final Class testClass)
+    {
+        setClassQualifiedTestName(testClass.getName());
+        LOGGER.info("========================= starting broker for test class : {}", testClass.getSimpleName());
+        startBroker(testClass);
+    }
+
+    @Override
+    public void start(final Class testClass, final Method method)
+    {
+        LOGGER.info("========================= prepare test environment for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        String virtualHostNodeName = getVirtualHostNodeName(testClass, method);
+        createVirtualHost(virtualHostNodeName);
+        _virtualHostNodeName = virtualHostNodeName;
+        LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
+        setClassQualifiedTestName(testClass.getName() + "." + method.getName());
+        LOGGER.info("========================= start executing test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+    }
+
+
+    @Override
+    public void stop(final Class testClass, final Method method)
+    {
+        LOGGER.info("========================= stop executing test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        setClassQualifiedTestName(testClass.getName());
+        LOGGER.info("========================= cleaning up test environment for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+        deleteVirtualHost(getVirtualHostNodeName(testClass, method));
+        _virtualHostNodeName = null;
+        LOGGER.info("========================= cleaning done for test : {}#{}",
+                    testClass.getSimpleName(),
+                    method.getName());
+    }
+
+    @Override
+    public void destroy(final Class testClass)
+    {
+        LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
+        shutdown();
+        _ports.clear();
+        if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
+        {
+            FileUtils.delete(new File(_currentWorkDirectory), true);
+        }
+        _isPersistentStore = false;
+        LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
+        setClassQualifiedTestName(null);
+    }
+
+    @Override
+    public InetSocketAddress getBrokerAddress(final PortType portType)
+    {
+        Integer port = null;
+        switch (portType)
+        {
+            case AMQP:
+                for (ListeningPort p : _ports)
+                {
+                    if (p.getProtocol() == null
+                        && (p.getTransport().contains("TCP") /*|| p.getTransport().contains("SSL") */))
+                    {
+                        port = p.getPort();
+                        break;
+                    }
+                }
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
+        }
+        if (port == null)
+        {
+            throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
+        }
+        return new InetSocketAddress(port);
+    }
+
+    @Override
+    public boolean supportsPersistence()
+    {
+        return _isPersistentStore;
+    }
+
+    @Override
+    public ListenableFuture<Void> restart()
+    {
+        if (_virtualHostNodeName == null)
+        {
+            throw new BrokerAdminException("Virtual host is not started");
+        }
+        return restartVirtualHost(_virtualHostNodeName);
+    }
+
+    @Override
+    public String getValidUsername()
+    {
+        return "guest";
+    }
+
+    @Override
+    public String getValidPassword()
+    {
+        return "guest";
+    }
+
+    @Override
+    public String getType()
+    {
+        return SpawnQpidBrokerAdmin.class.getSimpleName();
+    }
+
+    @Override
+    public BrokerType getBrokerType()
+    {
+        return BrokerType.BROKERJ;
+    }
+
+    @Override
+    public Connection getConnection() throws JMSException
+    {
+        return createConnection(_virtualHostNodeName);
+    }
+
+    private void startBroker(final Class testClass)
+    {
+        try
+        {
+            start(testClass);
+        }
+        catch (Exception e)
+        {
+            if (e instanceof RuntimeException)
+            {
+                throw (RuntimeException) e;
+            }
+            else
+            {
+                throw new BrokerAdminException("Unexpected exception on broker startup", e);
+            }
+        }
+    }
+
+    void start(final Class testClass) throws Exception
+    {
+        String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
+        _currentWorkDirectory =
+                Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName()))
+                     .toString();
+
+        String initialConfiguration = System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION);
+        if (initialConfiguration == null)
+        {
+            throw new BrokerAdminException(
+                    String.format("No initial configuration is found: JVM property '%s' is not set.",
+                                  SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION));
+        }
+
+        File testInitialConfiguration = new File(_currentWorkDirectory, "initial-configuration.json");
+        if (!testInitialConfiguration.createNewFile())
+        {
+            throw new BrokerAdminException("Failed to create a file for a copy of initial configuration");
+        }
+        if (initialConfiguration.startsWith("classpath:"))
+        {
+            String config = initialConfiguration.substring("classpath:".length());
+            try (InputStream is = getClass().getClassLoader().getResourceAsStream(config);
+                 OutputStream os = new FileOutputStream(testInitialConfiguration))
+            {
+                ByteStreams.copy(is, os);
+            }
+        }
+        else
+        {
+            Files.copy(new File(initialConfiguration).toPath(), testInitialConfiguration.toPath());
+        }
+
+        String classpath;
+        File file = new File(System.getProperty(SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE));
+        if (!file.exists())
+        {
+            String dependencies = System.getProperty(SYSTEST_PROPERTY_BROKERJ_DEPENDECIES);
+            final ClasspathQuery classpathQuery = new ClasspathQuery(SpawnQpidBrokerAdmin.class,
+                                                                     Arrays.asList(dependencies.split(",")));
+            classpath = classpathQuery.getClasspath();
+            Files.write(file.toPath(), Collections.singleton(classpath), UTF_8);
+        }
+        else
+        {
+            classpath = new String(Files.readAllBytes(file.toPath()), UTF_8);
+        }
+
+        // grab Qpid related JVM settings
+        List<String> jvmArguments = new ArrayList<>();
+        Properties jvmProperties = System.getProperties();
+        for (String jvmProperty : jvmProperties.stringPropertyNames())
+        {
+            if (jvmProperty.startsWith(SYSTEST_PROPERTY_PREFIX)
+                || jvmProperty.equalsIgnoreCase("java.io.tmpdir"))
+            {
+                jvmArguments.add("-D" + jvmProperty + "=" + jvmProperties.getProperty(jvmProperty));
+            }
+        }
+
+        jvmArguments.add(0, System.getProperty(SYSTEST_PROPERTY_JAVA8_EXECUTABLE, "/usr/bin/java"));
+        jvmArguments.add(1, "-cp");
+        jvmArguments.add(2, classpath);
+        jvmArguments.add("-Dqpid.systest.logback.socket.port="
+                         + LogbackSocketPortNumberDefiner.getLogbackSocketPortNumber());
+        jvmArguments.add("-Dqpid.systest.logback.logs_dir=" + System.getProperty("qpid.systest.logback.logs_dir",
+                                                                                 "${qpid.work_dir}"));
+        jvmArguments.add(String.format("-Dqpid.systest.logback.origin=%s-%d",
+                                       BROKER_LOG_PREFIX,
+                                       BROKER_INSTANCE_COUNTER.getAndIncrement()));
+        jvmArguments.add("-Dqpid.systest.logback.context=" + testClass.getName());
+        if (System.getProperty("qpid.systest.remote_debugger") != null)
+        {
+            jvmArguments.add(System.getProperty("qpid.systest.remote_debugger"));
+        }
+        jvmArguments.add("org.apache.qpid.server.Main");
+        jvmArguments.add("-prop");
+        jvmArguments.add(String.format("qpid.work_dir=%s", escapePath(_currentWorkDirectory)));
+        jvmArguments.add("--store-type");
+        jvmArguments.add("JSON");
+        jvmArguments.add("--initial-config-path");
+        jvmArguments.add(escapePath(testInitialConfiguration.toString()));
+
+        LOGGER.debug("Spawning broker JVM :", jvmArguments);
+
+        String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY, "BRK-1004 : Qpid Broker Ready");
+        String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED, "BRK-1005 : Stopped");
+        String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING,
+                                                  "BRK-1002 : Starting( : \\w*)? : Listening on (\\w*) port ([0-9]+)");
+        String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS, "BRK-1017 : Process : PID : ([0-9]+)");
+        int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
+
+        LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);
+
+        String[] cmd = jvmArguments.toArray(new String[jvmArguments.size()]);
+
+        ProcessBuilder processBuilder = new ProcessBuilder(cmd);
+        processBuilder.redirectErrorStream(true);
+
+        Map<String, String> processEnvironment = processBuilder.environment();
+        processEnvironment.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + testClass.getName() + "\"");
+
+        long startTime = System.currentTimeMillis();
+        _process = processBuilder.start();
+
+        BrokerSystemOutpuHandler brokerSystemOutpuHandler = new BrokerSystemOutpuHandler(_process.getInputStream(),
+                                                                                         ready,
+                                                                                         stopped,
+                                                                                         process,
+                                                                                         amqpListening,
+                                                                                         getClass().getName());
+
+        boolean brokerStarted = false;
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        try
+        {
+            Future<?> result = executorService.submit(brokerSystemOutpuHandler);
+            result.get(startUpTime, TimeUnit.MILLISECONDS);
+
+            _pid = brokerSystemOutpuHandler.getPID();
+            _ports = brokerSystemOutpuHandler.getAmqpPorts();
+
+            if (_pid == -1)
+            {
+                throw new BrokerAdminException("Broker PID is not detected");
+            }
+
+            if (_ports.size() == 0)
+            {
+                throw new BrokerAdminException("Broker port is not detected");
+            }
+
+            try
+            {
+                //test that the broker is still running and hasn't exited unexpectedly
+                int exit = _process.exitValue();
+                LOGGER.info("broker aborted: {}", exit);
+                throw new BrokerAdminException("broker aborted: " + exit);
+            }
+            catch (IllegalThreadStateException e)
+            {
+                // this is expect if the broker started successfully
+            }
+
+            LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
+                        System.currentTimeMillis() - startTime,
+                        _pid);
+            LOGGER.info("Broker ports: {}", _ports);
+            brokerStarted = true;
+        }
+        catch (RuntimeException e)
+        {
+            throw e;
+        }
+        catch (TimeoutException e)
+        {
+            LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
+                        startUpTime, brokerSystemOutpuHandler.getReady());
+            String threadDump = dumpThreads();
+            if (!threadDump.isEmpty())
+            {
+                LOGGER.warn("the result of a try to capture thread dump:" + threadDump);
+            }
+            throw new BrokerAdminException(String.format("Broker failed to become ready within %d ms. Stop line : %s",
+                                                         startUpTime,
+                                                         brokerSystemOutpuHandler.getStopLine()));
+        }
+        catch (ExecutionException e)
+        {
+            throw new BrokerAdminException(String.format("Broker startup failed due to %s", e.getCause()),
+                                           e.getCause());
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
+        }
+        finally
+        {
+            if (!brokerStarted)
+            {
+                LOGGER.warn("Broker failed to start");
+                _process.destroy();
+            }
+            executorService.shutdown();
+        }
+    }
+
+    void createVirtualHost(final String virtualHostNodeName)
+    {
+        final String nodeType = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE);
+        _isPersistentStore = !"Memory".equals(nodeType);
+
+        String storeDir = null;
+        if (System.getProperty("profile", "").startsWith("java-dby-mem"))
+        {
+            storeDir = ":memory:";
+        }
+        else if (!"Memory".equals(nodeType))
+        {
+            storeDir = "${qpid.work_dir}" + File.separator + virtualHostNodeName;
+        }
+
+        String blueprint = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT);
+        LOGGER.debug("Creating Virtual host from blueprint: {}", blueprint);
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put("name", virtualHostNodeName);
+        attributes.put("type", nodeType);
+        attributes.put("qpid-type", nodeType);
+        String contextAsString;
+        try
+        {
+            contextAsString =
+                    new ObjectMapper().writeValueAsString(Collections.singletonMap("virtualhostBlueprint", blueprint));
+        }
+        catch (JsonProcessingException e)
+        {
+            throw new BrokerAdminException("Cannot create virtual host as context serialization failed", e);
+        }
+        attributes.put("context", contextAsString);
+        attributes.put("defaultVirtualHostNode", true);
+        attributes.put("virtualHostInitialConfiguration", blueprint);
+        if (storeDir != null)
+        {
+            attributes.put("storePath", storeDir);
+        }
+
+        try
+        {
+            Connection connection = createConnection("$management");
+            try
+            {
+                connection.start();
+                final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                try
+                {
+                    new AmqpManagementFacade().createEntityUsingAmqpManagement(virtualHostNodeName,
+                                                                               "org.apache.qpid.VirtualHostNode",
+                                                                               attributes,
+                                                                               session);
+                }
+                catch (AmqpManagementFacade.OperationUnsuccessfulException e)
+                {
+                    throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'",
+                                                                 virtualHostNodeName), e);
+                }
+                finally
+                {
+                    session.close();
+                }
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        catch (JMSException e)
+        {
+            throw new BrokerAdminException(String.format("Cannot create virtual host '%s'", virtualHostNodeName), e);
+        }
+    }
+
+    void deleteVirtualHost(final String virtualHostNodeName)
+    {
+        try
+        {
+            Connection connection = createConnection("$management");
+            try
+            {
+                connection.start();
+                final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                try
+                {
+                    new AmqpManagementFacade().deleteEntityUsingAmqpManagement(virtualHostNodeName,
+                                                                               "org.apache.qpid.VirtualHostNode",
+                                                                               session);
+                }
+                catch (AmqpManagementFacade.OperationUnsuccessfulException e)
+                {
+                    throw new BrokerAdminException(String.format("Cannot delete test virtual host '%s'",
+                                                                 virtualHostNodeName), e);
+                }
+                finally
+                {
+                    session.close();
+                }
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        catch (JMSException e)
+        {
+            throw new BrokerAdminException(String.format("Cannot delete virtual host '%s'", virtualHostNodeName), e);
+        }
+    }
+
+    ListenableFuture<Void> restartVirtualHost(final String virtualHostNodeName)
+    {
+        try
+        {
+            Connection connection = createConnection("$management");
+            try
+            {
+                connection.start();
+                updateVirtualHostNode(virtualHostNodeName,
+                                      Collections.<String, Object>singletonMap("desiredState", "STOPPED"), connection);
+                updateVirtualHostNode(virtualHostNodeName,
+                                      Collections.<String, Object>singletonMap("desiredState", "ACTIVE"), connection);
+                return Futures.immediateFuture(null);
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        catch (JMSException e)
+        {
+            throw new BrokerAdminException(String.format("Cannot create virtual host %s", virtualHostNodeName), e);
+        }
+    }
+
+    private void updateVirtualHostNode(final String virtualHostNodeName,
+                                       final Map<String, Object> attributes,
+                                       final Connection connection) throws JMSException
+    {
+        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            new AmqpManagementFacade().updateEntityUsingAmqpManagement(virtualHostNodeName,
+                                                                       "org.apache.qpid.VirtualHostNode",
+                                                                       attributes,
+                                                                       session);
+        }
+        catch (AmqpManagementFacade.OperationUnsuccessfulException e)
+        {
+            throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'", virtualHostNodeName),
+                                           e);
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+    void shutdown()
+    {
+        if (SystemUtils.isWindows())
+        {
+            doWindowsKill();
+        }
+
+        if (_process != null)
+        {
+            LOGGER.info("Destroying broker process");
+            _process.destroy();
+
+            reapChildProcess();
+        }
+    }
+
+    private String escapePath(String value)
+    {
+        if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
+        {
+            return "\"" + value.replaceAll("\"", "\"\"") + "\"";
+        }
+        else
+        {
+            return value;
+        }
+    }
+
+    private Connection createConnection(String virtualHostName) throws JMSException
+    {
+        final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
+        initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
+                                      "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+        final String factoryName = "connectionFactory";
+        String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
+        String url = String.format(urlTemplate,
+                                   "spawn_broker_admin",
+                                   virtualHostName,
+                                   getBrokerAddress(PortType.AMQP).getPort());
+        initialContextEnvironment.put("connectionfactory." + factoryName, url);
+        try
+        {
+            InitialContext initialContext = new InitialContext(initialContextEnvironment);
+            try
+            {
+                ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
+                return factory.createConnection(getValidUsername(), getValidPassword());
+            }
+            finally
+            {
+                initialContext.close();
+            }
+        }
+        catch (NamingException e)
+        {
+            throw new BrokerAdminException("Unexpected exception on connection lookup", e);
+        }
+    }
+
+    private void setClassQualifiedTestName(final String name)
+    {
+        final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
+        loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, name);
+    }
+
+
+    private String getVirtualHostNodeName(final Class testClass, final Method method)
+    {
+        return testClass.getSimpleName() + "_" + method.getName();
+    }
+
+
+    private void doWindowsKill()
+    {
+        try
+        {
+
+            Process p;
+            p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
+            consumeAllOutput(p);
+        }
+        catch (IOException e)
+        {
+            LOGGER.error("Error whilst killing process " + _pid, e);
+        }
+    }
+
+    private static void consumeAllOutput(Process p) throws IOException
+    {
+        try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
+        {
+            try (BufferedReader reader = new BufferedReader(inputStreamReader))
+            {
+                String line;
+                while ((line = reader.readLine()) != null)
+                {
+                    LOGGER.debug("Consuming output: {}", line);
+                }
+            }
+        }
+    }
+
+    private void reapChildProcess()
+    {
+        try
+        {
+            _process.waitFor();
+            LOGGER.info("broker exited: " + _process.exitValue());
+        }
+        catch (InterruptedException e)
+        {
+            LOGGER.error("Interrupted whilst waiting for process shutdown");
+            Thread.currentThread().interrupt();
+        }
+        finally
+        {
+            try
+            {
+                _process.getInputStream().close();
+                _process.getErrorStream().close();
+                _process.getOutputStream().close();
+            }
+            catch (IOException ignored)
+            {
+            }
+        }
+    }
+
+    private String dumpThreads()
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try
+        {
+            Process process = Runtime.getRuntime().exec("jstack " + _pid);
+            InputStream is = process.getInputStream();
+            byte[] buffer = new byte[1024];
+            int length;
+            while ((length = is.read(buffer)) != -1)
+            {
+                baos.write(buffer, 0, length);
+            }
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Error whilst collecting thread dump for " + _pid, e);
+        }
+        return new String(baos.toByteArray());
+    }
+
+
+    public final class BrokerSystemOutpuHandler implements Runnable
+    {
+
+        private final BufferedReader _in;
+        private final Logger _out;
+        private final String _ready;
+        private final String _stopped;
+        private final List<ListeningPort> _amqpPorts;
+        private final Pattern _pidPattern;
+        private final Pattern _amqpPortPattern;
+        private volatile boolean _seenReady;
+        private volatile String _stopLine;
+        private volatile int _pid;
+
+        private BrokerSystemOutpuHandler(InputStream in,
+                                         String ready,
+                                         String stopped,
+                                         String pidRegExp,
+                                         String amqpPortRegExp,
+                                         String loggerName)
+        {
+            _amqpPorts = new ArrayList<>();
+            _in = new BufferedReader(new InputStreamReader(in));
+            _out = LoggerFactory.getLogger(loggerName);
+            _ready = ready;
+            _stopped = stopped;
+            _seenReady = false;
+            _amqpPortPattern = Pattern.compile(amqpPortRegExp);
+            _pidPattern = Pattern.compile(pidRegExp);
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                String line;
+                while ((line = _in.readLine()) != null)
+                {
+                    _out.info(line);
+
+                    checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);
+
+                    Matcher pidMatcher = _pidPattern.matcher(line);
+                    if (pidMatcher.find())
+                    {
+                        if (pidMatcher.groupCount() > 1)
+                        {
+                            _pid = Integer.parseInt(pidMatcher.group(1));
+                        }
+                    }
+
+                    if (line.contains(_ready))
+                    {
+                        _seenReady = true;
+                        break;
+                    }
+
+                    if (!_seenReady && line.contains(_stopped))
+                    {
+                        _stopLine = line;
+                    }
+                }
+            }
+            catch (IOException e)
+            {
+                LOGGER.warn(e.getMessage()
+                            + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
+            }
+        }
+
+        private void checkPortListeningLog(final String line,
+                                           final Pattern portPattern,
+                                           final List<ListeningPort> ports)
+        {
+            Matcher portMatcher = portPattern.matcher(line);
+            if (portMatcher.find())
+            {
+                ports.add(new ListeningPort(portMatcher.group(1),
+                                            portMatcher.group(2),
+                                            Integer.parseInt(portMatcher.group(3))));
+            }
+        }
+
+        String getStopLine()
+        {
+            return _stopLine;
+        }
+
+        String getReady()
+        {
+            return _ready;
+        }
+
+        int getPID()
+        {
+            return _pid;
+        }
+
+        List<ListeningPort> getAmqpPorts()
+        {
+            return _amqpPorts;
+        }
+    }
+
+    private static class ListeningPort
+    {
+        private String _protocol;
+        private String _transport;
+        private int _port;
+
+        ListeningPort(final String protocol, final String transport, final int port)
+        {
+            _transport = transport;
+            _port = port;
+            _protocol = protocol;
+        }
+
+        String getTransport()
+        {
+            return _transport;
+        }
+
+        int getPort()
+        {
+            return _port;
+        }
+
+        String getProtocol()
+        {
+            return _protocol;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ListeningPort{" +
+                   "_protocol='" + _protocol + '\'' +
+                   ", _transport='" + _transport + '\'' +
+                   ", _port=" + _port +
+                   '}';
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message