From commits-return-43312-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Wed Jan 10 14:16:15 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 714D418076D for ; Wed, 10 Jan 2018 14:16:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 613CA160C23; Wed, 10 Jan 2018 13:16:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 098A3160C2E for ; Wed, 10 Jan 2018 14:16:12 +0100 (CET) Received: (qmail 30981 invoked by uid 500); 10 Jan 2018 13:16:12 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 30972 invoked by uid 99); 10 Jan 2018 13:16:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jan 2018 13:16:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B7A9DFA44; Wed, 10 Jan 2018 13:16:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: orudyy@apache.org To: commits@qpid.apache.org Date: Wed, 10 Jan 2018 13:16:09 -0000 Message-Id: <60663a97e4a44d87a73e2ca775974584@git.apache.org> In-Reply-To: <08c06eebd9db486ea7606dfb27928e76@git.apache.org> References: <08c06eebd9db486ea7606dfb27928e76@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] qpid-jms-amqp-0-x git commit: QPID-8074: [JMS AMQP 0-x][System Tests] Build framework to run JMS client system tests QPID-8074: [JMS AMQP 0-x][System Tests] Build framework to run JMS client system tests Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/6899893a Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/6899893a Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/6899893a Branch: refs/heads/master Commit: 6899893a3eac396e4810e9ea2bde0dadb296babd Parents: a2c3f8b Author: Alex Rudyy Authored: Wed Jan 10 13:03:33 2018 +0000 Committer: Alex Rudyy Committed: Wed Jan 10 13:15:39 2018 +0000 ---------------------------------------------------------------------- pom.xml | 50 +- systests/pom.xml | 188 ++++ .../apache/qpid/systest/core/BrokerAdmin.java | 62 ++ .../qpid/systest/core/BrokerAdminException.java | 34 + .../qpid/systest/core/BrokerAdminFactory.java | 44 + .../systest/core/BrokerAdminUsingTestBase.java | 39 + .../apache/qpid/systest/core/JmsTestBase.java | 88 ++ .../qpid/systest/core/QpidTestRunner.java | 88 ++ .../core/brokerj/AmqpManagementFacade.java | 554 ++++++++++++ .../core/brokerj/SpawnQpidBrokerAdmin.java | 898 +++++++++++++++++++ .../qpid/systest/core/dependency/Booter.java | 103 +++ .../systest/core/dependency/ClasspathQuery.java | 202 +++++ .../dependency/ConsoleRepositoryListener.java | 128 +++ .../dependency/ConsoleTransferListener.java | 175 ++++ .../ManualRepositorySystemFactory.java | 58 ++ .../LogbackPropertyValueDiscriminator.java | 65 ++ .../logback/LogbackSocketPortNumberDefiner.java | 45 + .../qpid/systest/core/util/FileUtils.java | 58 ++ .../qpid/systest/core/util/PortHelper.java | 191 ++++ .../qpid/systest/core/util/SystemUtils.java | 35 + .../apache/qpid/systest/core/util/Utils.java | 122 +++ .../resources/broker-j-config-with-logging.json | 83 ++ .../core/brokerj/SpawnQpidBrokerAdminTest.java | 282 ++++++ systests/src/test/resources/logback-test.xml | 58 ++ 24 files changed, 3649 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4a33a8a..0bc3c55 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,8 @@ 1.9.5 1.3 + 3.5.0 + 1.0.3 1.3.2 2.6 2.3 @@ -118,6 +120,7 @@ client client/example doc + systests @@ -162,7 +165,6 @@ junit junit ${junit-version} - test @@ -171,6 +173,52 @@ ${mockito-version} test + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml-jackson-version} + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml-jackson-version} + + + org.hamcrest + hamcrest-library + ${hamcrest-version} + + + org.hamcrest + hamcrest-integration + ${hamcrest-version} + + + org.apache.maven + maven-core + ${maven-core-version} + + + org.apache.maven.resolver + maven-resolver-api + ${maven-resolver-version} + + + org.apache.maven.resolver + maven-resolver-connector-basic + ${maven-resolver-version} + + + org.apache.maven.resolver + maven-resolver-transport-file + ${maven-resolver-version} + + + org.apache.maven.resolver + maven-resolver-transport-http + ${maven-resolver-version} + http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/pom.xml ---------------------------------------------------------------------- diff --git a/systests/pom.xml b/systests/pom.xml new file mode 100644 index 0000000..70bca14 --- /dev/null +++ b/systests/pom.xml @@ -0,0 +1,188 @@ + + + + 4.0.0 + + + org.apache.qpid + qpid-jms-amqp-0-x-parent + 6.4.0-SNAPSHOT + + + qpid-client-systests + Apache Qpid JMS AMQP 0-x System Tests + Apache Qpid JMS AMQP 0-x System Tests + + + 0-10 + + + + + + org.apache.qpid + qpid-client + + + + org.apache.geronimo.specs + geronimo-jms_1.1_spec + + + + junit + junit + + + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.google.guava + guava + + + + org.apache.maven + maven-core + + + + org.apache.maven.resolver + maven-resolver-api + + + + org.apache.maven.resolver + maven-resolver-connector-basic + + + + org.apache.maven.resolver + maven-resolver-transport-file + + + + org.apache.maven.resolver + maven-resolver-transport-http + + + + ch.qos.logback + logback-classic + + + + org.slf4j + slf4j-api + + + + org.hamcrest + hamcrest-library + test + + + + org.hamcrest + hamcrest-integration + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${qpid.amqp.version} + + + + + + + + + broker-j + + + 7.0.0 + org.apache.qpid.systest.core.brokerj.SpawnQpidBrokerAdmin + /usr/bin/java + {"type":"BDB","globalAddressDomains":"[]"} + org.apache.qpid:qpid-broker:${qpid-broker-j-version},org.apache.qpid:qpid-broker-core:${qpid-broker-j-version},org.apache.qpid:qpid-bdbstore:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-0-8-protocol:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-0-10-protocol:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-amqp-msg-conv-0-8-to-0-10:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-management-amqp:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-access-control:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-derby-store:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-jdbc-provider-bone:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-jdbc-store:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-logging-logback:${qpid-broker-j-version},org.apache.qpid:qpid-broker-plugins-management-amqp:${qpid-broker-j-version},or g.apache.qpid:qpid-broker-plugins-memory-store:${qpid-broker-j-version},org.apache.qpid:qpid-bdbstore:${qpid-broker-j-version} + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${qpid.systest.brokerj.dependencies} + ${qpid.systest.java8.executable} + ${project.build.directory}/qpid.build.classpath.txt + classpath:broker-j-config-with-logging.json + ${qpid.systest.broker_admin} + JSON + ${qpid.systest.virtualhost.blueprint} + ${project.basedir}${file.separator}target${file.separator}surefire-reports + true + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + enforce-java8-check + + enforce + + + + + + ${qpid.systest.java8.executable} + + + + true + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java new file mode 100644 index 0000000..b2b41cc --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdmin.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; + +import javax.jms.Connection; +import javax.jms.JMSException; + +import com.google.common.util.concurrent.ListenableFuture; + +@SuppressWarnings("unused") +public interface BrokerAdmin +{ + void create(final Class testClass); + void start(final Class testClass, final Method method); + void stop(final Class testClass, final Method method); + void destroy(final Class testClass); + ListenableFuture restart(); + + InetSocketAddress getBrokerAddress(PortType portType); + boolean supportsPersistence(); + + String getValidUsername(); + String getValidPassword(); + + String getType(); + BrokerType getBrokerType(); + + Connection getConnection() throws JMSException; + + enum PortType + { + ANONYMOUS_AMQP, + AMQP + } + + enum BrokerType + { + BROKERJ, + CPP + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java new file mode 100644 index 0000000..5b6a3f5 --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminException.java @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.core; + +public class BrokerAdminException extends RuntimeException +{ + public BrokerAdminException(final String message) + { + super(message); + } + + public BrokerAdminException(final String message, final Throwable cause) + { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java new file mode 100644 index 0000000..5cfdc53 --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core; + +class BrokerAdminFactory +{ + BrokerAdmin createInstance() + { + String type = System.getProperty("qpid.systest.broker_admin"); + if (type != null) + { + try + { + @SuppressWarnings("unchecked") + Class c = (Class) Class.forName(type); + return c.newInstance(); + } + catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) + { + throw new BrokerAdminException(String.format("Could not find BrokerAdmin implementation of type '%s'", + type)); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java new file mode 100644 index 0000000..72858ca --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/BrokerAdminUsingTestBase.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core; + +import org.junit.runner.RunWith; + +@RunWith(QpidTestRunner.class) +public abstract class BrokerAdminUsingTestBase +{ + private BrokerAdmin _brokerAdmin; + + public void init(final BrokerAdmin brokerAdmin) + { + _brokerAdmin = brokerAdmin; + } + + public BrokerAdmin getBrokerAdmin() + { + return _brokerAdmin; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java new file mode 100644 index 0000000..49cc3b2 --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/JmsTestBase.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest.core; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assume.assumeThat; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; +import javax.naming.NamingException; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class JmsTestBase extends BrokerAdminUsingTestBase +{ + private static final Logger LOGGER = LoggerFactory.getLogger(JmsTestBase.class); + + @Rule + public final TestName _testName = new TestName(); + + @Before + public void setUpTestBase() + { + assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s", + getClass().getName(), + _testName.getMethodName()), + getBrokerAdmin(), is(notNullValue())); + LOGGER.debug("Test receive timeout is {} milliseconds", getReceiveTimeout()); + } + + + protected Connection getConnection() throws JMSException, NamingException + { + assumeThat(String.format("BrokerAdmin is not available. Skipping the test %s#%s", + getClass().getName(), + _testName.getMethodName()), + getBrokerAdmin(), is(notNullValue())); + + return getBrokerAdmin().getConnection(); + } + + protected static long getReceiveTimeout() + { + return Long.getLong("qpid.test_receive_timeout", 1000L); + } + + protected String getTestName() + { + return _testName.getMethodName(); + } + + + protected TopicConnection getTopicConnection() throws JMSException, NamingException + { + return (TopicConnection) getConnection(); + } + + protected QueueConnection getQueueConnection() throws JMSException, NamingException + { + return (QueueConnection) getConnection(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java new file mode 100644 index 0000000..37bd96c --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/QpidTestRunner.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core; + +import org.junit.runner.notification.RunNotifier; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.junit.runners.model.FrameworkMethod; +import org.junit.runners.model.InitializationError; + +public class QpidTestRunner extends BlockJUnit4ClassRunner +{ + private final BrokerAdmin _brokerAdmin; + private final Class _testClass; + + public QpidTestRunner(final Class klass) throws InitializationError + { + super(klass); + _testClass = klass; + _brokerAdmin = new BrokerAdminFactory().createInstance(); + } + + @Override + protected Object createTest() throws Exception + { + Object test = super.createTest(); + BrokerAdminUsingTestBase qpidTest = ((BrokerAdminUsingTestBase) test); + qpidTest.init(_brokerAdmin); + return test; + } + + @Override + public void run(final RunNotifier notifier) + { + if (_brokerAdmin != null) + { + _brokerAdmin.create(_testClass); + } + try + { + super.run(notifier); + } + finally + { + if (_brokerAdmin != null) + { + _brokerAdmin.destroy(_testClass); + } + } + } + + @Override + protected void runChild(final FrameworkMethod method, final RunNotifier notifier) + { + if (_brokerAdmin != null) + { + _brokerAdmin.start(_testClass, method.getMethod()); + } + try + { + super.runChild(method, notifier); + } + finally + { + if (_brokerAdmin != null) + { + _brokerAdmin.stop(_testClass, method.getMethod()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java new file mode 100644 index 0000000..ed7f116 --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java @@ -0,0 +1,554 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core.brokerj; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +class AmqpManagementFacade +{ + private static final String AMQP_0_X_REPLY_TO_DESTINATION = "ADDR:!response"; + private static final String AMQP_0_X_CONSUMER_REPLY_DESTINATION = + "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}"; + private final String _managementAddress; + + + AmqpManagementFacade() + { + _managementAddress = "ADDR:$management"; + } + + @SuppressWarnings("unused") + Map createEntityUsingAmqpManagement(final String name, + final String type, + final Session session) + throws JMSException + { + return createEntityUsingAmqpManagement(name, type, Collections.emptyMap(), session); + } + + Map createEntityUsingAmqpManagement(final String name, + final String type, + Map attributes, + final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + + MapMessage createMessage = session.createMapMessage(); + createMessage.setStringProperty("type", type); + createMessage.setStringProperty("operation", "CREATE"); + createMessage.setString("name", name); + createMessage.setString("object-path", name); + createMessage.setJMSReplyTo(replyToDestination); + for (Map.Entry entry : attributes.entrySet()) + { + createMessage.setObject(entry.getKey(), entry.getValue()); + } + producer.send(createMessage); + if (session.getTransacted()) + { + session.commit(); + } + producer.close(); + + Message response = consumer.receive(getManagementResponseTimeout()); + try + { + if (response != null) + { + int statusCode = response.getIntProperty("statusCode"); + if (statusCode == 201) + { + if (response instanceof MapMessage) + { + MapMessage bodyMap = (MapMessage) response; + Map result = new HashMap<>(); + Enumeration keys = bodyMap.getMapNames(); + while (keys.hasMoreElements()) + { + final String key = String.valueOf(keys.nextElement()); + Object value = bodyMap.getObject(key); + result.put(key, value); + } + return result; + } + else if (response instanceof ObjectMessage) + { + Object body = ((ObjectMessage) response).getObject(); + if (body instanceof Map) + { + @SuppressWarnings("unchecked") + Map bodyMap = (Map) body; + return new HashMap<>(bodyMap); + } + } + } + else + { + throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), + statusCode); + } + } + + throw new OperationUnsuccessfulException("Cannot get the results from a management create operation", -1); + } + finally + { + consumer.close(); + } + } + + void updateEntityUsingAmqpManagement(final String name, + final String type, + final Map attributes, + final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + + MapMessage createMessage = session.createMapMessage(); + createMessage.setStringProperty("type", type); + createMessage.setStringProperty("operation", "UPDATE"); + createMessage.setStringProperty("index", "object-path"); + createMessage.setStringProperty("key", name); + createMessage.setJMSReplyTo(replyToDestination); + for (Map.Entry entry : attributes.entrySet()) + { + createMessage.setObject(entry.getKey(), entry.getValue()); + } + producer.send(createMessage); + if (session.getTransacted()) + { + session.commit(); + } + producer.close(); + + Message response = consumer.receive(getManagementResponseTimeout()); + try + { + if (response != null) + { + int statusCode = response.getIntProperty("statusCode"); + if (statusCode != 200) + { + + throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), + statusCode); + } + } + else + { + throw new OperationUnsuccessfulException("Cannot get the results from a management update operation", + -1); + } + } + finally + { + consumer.close(); + } + } + + void deleteEntityUsingAmqpManagement(final String name, + final String type, + final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + + MapMessage createMessage = session.createMapMessage(); + createMessage.setStringProperty("type", type); + createMessage.setStringProperty("operation", "DELETE"); + createMessage.setStringProperty("index", "object-path"); + createMessage.setJMSReplyTo(replyToDestination); + + createMessage.setStringProperty("key", name); + producer.send(createMessage); + if (session.getTransacted()) + { + session.commit(); + } + + Message response = consumer.receive(getManagementResponseTimeout()); + try + { + if (response != null) + { + int statusCode = response.getIntProperty("statusCode"); + if (statusCode != 200 && statusCode != 204) + { + + throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), + statusCode); + } + } + else + { + throw new OperationUnsuccessfulException("Cannot get the results from a management delete operation", + -1); + } + } + finally + { + consumer.close(); + } + } + + @SuppressWarnings("unused") + Object performOperationUsingAmqpManagement(final String name, + final String type, + final String operation, + final Map arguments, + final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + + MapMessage opMessage = session.createMapMessage(); + opMessage.setStringProperty("type", type); + opMessage.setStringProperty("operation", operation); + opMessage.setStringProperty("index", "object-path"); + opMessage.setJMSReplyTo(replyToDestination); + + opMessage.setStringProperty("key", name); + for (Map.Entry argument : arguments.entrySet()) + { + Object value = argument.getValue(); + if (value.getClass().isPrimitive() || value instanceof String) + { + opMessage.setObjectProperty(argument.getKey(), value); + } + else + { + ObjectMapper objectMapper = new ObjectMapper(); + String jsonifiedValue; + try + { + jsonifiedValue = objectMapper.writeValueAsString(value); + } + catch (JsonProcessingException e) + { + throw new IllegalArgumentException(String.format( + "Cannot convert the argument '%s' to JSON to meet JMS type restrictions", + argument.getKey())); + } + opMessage.setObjectProperty(argument.getKey(), jsonifiedValue); + } + } + + producer.send(opMessage); + if (session.getTransacted()) + { + session.commit(); + } + + Message response = consumer.receive(getManagementResponseTimeout()); + try + { + int statusCode = response.getIntProperty("statusCode"); + if (statusCode < 200 || statusCode > 299) + { + throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode); + } + if (response instanceof MapMessage) + { + MapMessage bodyMap = (MapMessage) response; + Map result = new TreeMap<>(); + Enumeration mapNames = bodyMap.getMapNames(); + while (mapNames.hasMoreElements()) + { + String key = (String) mapNames.nextElement(); + result.put(key, bodyMap.getObject(key)); + } + return result; + } + else if (response instanceof ObjectMessage) + { + return ((ObjectMessage) response).getObject(); + } + else if (response instanceof BytesMessage) + { + BytesMessage bytesMessage = (BytesMessage) response; + if (bytesMessage.getBodyLength() == 0) + { + return null; + } + else + { + byte[] buf = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(buf); + return buf; + } + } + throw new IllegalArgumentException( + "Cannot parse the results from a management operation. JMS response message : " + response); + } + finally + { + if (session.getTransacted()) + { + session.commit(); + } + consumer.close(); + } + } + + @SuppressWarnings(value = {"unused", "unchecked"}) + List> managementQueryObjects(final String type, final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + MapMessage message = session.createMapMessage(); + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "QUERY"); + message.setStringProperty("entityType", type); + message.setString("attributeNames", "[]"); + message.setJMSReplyTo(replyToDestination); + + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + producer.send(message); + + Message response = consumer.receive(getManagementResponseTimeout()); + try + { + if (response instanceof MapMessage) + { + MapMessage bodyMap = (MapMessage) response; + List attributeNames = (List) bodyMap.getObject("attributeNames"); + List> attributeValues = (List>) bodyMap.getObject("results"); + return getResultsAsMaps(attributeNames, attributeValues); + } + else if (response instanceof ObjectMessage) + { + Object body = ((ObjectMessage) response).getObject(); + if (body instanceof Map) + { + Map bodyMap = (Map) body; + List attributeNames = (List) bodyMap.get("attributeNames"); + List> attributeValues = (List>) bodyMap.get("results"); + return getResultsAsMaps(attributeNames, attributeValues); + } + } + throw new IllegalArgumentException("Cannot parse the results from a management query"); + } + finally + { + consumer.close(); + } + } + + @SuppressWarnings("unused") + Map readEntityUsingAmqpManagement(final String name, + final String type, + final boolean actuals, + final Session session) + throws JMSException + { + Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION); + Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION); + + MessageConsumer consumer = session.createConsumer(replyConsumerDestination); + + MessageProducer producer = session.createProducer(session.createQueue(_managementAddress)); + + MapMessage request = session.createMapMessage(); + request.setStringProperty("type", type); + request.setStringProperty("operation", "READ"); + request.setString("name", name); + request.setString("object-path", name); + request.setStringProperty("index", "object-path"); + request.setStringProperty("key", name); + request.setBooleanProperty("actuals", actuals); + request.setJMSReplyTo(replyToDestination); + + producer.send(request); + if (session.getTransacted()) + { + session.commit(); + } + + Message response = consumer.receive(getManagementResponseTimeout()); + if (session.getTransacted()) + { + session.commit(); + } + try + { + if (response instanceof MapMessage) + { + MapMessage bodyMap = (MapMessage) response; + Map data = new HashMap<>(); + @SuppressWarnings("unchecked") + Enumeration keys = bodyMap.getMapNames(); + while (keys.hasMoreElements()) + { + String key = keys.nextElement(); + data.put(key, bodyMap.getObject(key)); + } + return data; + } + else if (response instanceof ObjectMessage) + { + Object body = ((ObjectMessage) response).getObject(); + if (body instanceof Map) + { + @SuppressWarnings("unchecked") + Map bodyMap = (Map) body; + return new HashMap<>(bodyMap); + } + } + throw new IllegalArgumentException("Management read failed : " + + response.getStringProperty("statusCode") + + " - " + + response.getStringProperty("statusDescription")); + } + finally + { + consumer.close(); + } + } + + @SuppressWarnings("unused") + long getQueueDepth(final Queue destination, final Session session) throws Exception + { + final String escapedName = getEscapedName(destination); + Map arguments = + Collections.singletonMap("statistics", (Object) Collections.singletonList("queueDepthMessages")); + + Object statistics = performOperationUsingAmqpManagement(escapedName, + "org.apache.qpid.Queue", "getStatistics", + arguments, session + ); + @SuppressWarnings("unchecked") + Map statisticsMap = (Map) statistics; + return ((Number) statisticsMap.get("queueDepthMessages")).intValue(); + } + + @SuppressWarnings("unused") + boolean isQueueExist(final Queue destination, final Session session) throws Exception + { + final String escapedName = getEscapedName(destination); + try + { + performOperationUsingAmqpManagement(escapedName, + "org.apache.qpid.Queue", + "READ", + Collections.emptyMap(), + session); + return true; + } + catch (AmqpManagementFacade.OperationUnsuccessfulException e) + { + if (e.getStatusCode() == 404) + { + return false; + } + else + { + throw e; + } + } + } + + private String getEscapedName(final Queue destination) throws JMSException + { + return destination.getQueueName().replaceAll("([/\\\\])", "\\\\$1"); + } + + private List> getResultsAsMaps(final List attributeNames, + final List> attributeValues) + { + List> results = new ArrayList<>(); + for (List resultObject : attributeValues) + { + Map result = new HashMap<>(); + for (int i = 0; i < attributeNames.size(); ++i) + { + result.put(attributeNames.get(i), resultObject.get(i)); + } + results.add(result); + } + return results; + } + + private int getManagementResponseTimeout() + { + return Integer.getInteger("qpid.systests.management_response_timeout", 5000); + } + + static class OperationUnsuccessfulException extends RuntimeException + { + private final int _statusCode; + + private OperationUnsuccessfulException(final String message, final int statusCode) + { + super(message == null ? String.format("Unexpected status code %d", statusCode) : message); + _statusCode = statusCode; + } + + int getStatusCode() + { + return _statusCode; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/6899893a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java new file mode 100644 index 0000000..3893288 --- /dev/null +++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/SpawnQpidBrokerAdmin.java @@ -0,0 +1,898 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systest.core.brokerj; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import ch.qos.logback.classic.LoggerContext; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.systest.core.BrokerAdmin; +import org.apache.qpid.systest.core.BrokerAdminException; +import org.apache.qpid.systest.core.dependency.ClasspathQuery; +import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator; +import org.apache.qpid.systest.core.util.FileUtils; +import org.apache.qpid.systest.core.logback.LogbackSocketPortNumberDefiner; +import org.apache.qpid.systest.core.util.SystemUtils; + +public class SpawnQpidBrokerAdmin implements BrokerAdmin +{ + private static final Logger LOGGER = LoggerFactory.getLogger(SpawnQpidBrokerAdmin.class); + private static final String BROKER_LOG_PREFIX = "BROKER"; + private static final String SYSTEST_PROPERTY_PREFIX = "qpid.systest."; + private static final String SYSTEST_PROPERTY_BROKER_READY = "qpid.systest.broker.ready"; + private static final String SYSTEST_PROPERTY_BROKER_STOPPED = "qpid.systest.broker.stopped"; + private static final String SYSTEST_PROPERTY_BROKER_LISTENING = "qpid.systest.broker.listening"; + private static final String SYSTEST_PROPERTY_BROKER_PROCESS = "qpid.systest.broker.process"; + private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time"; + private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests"; + + static final String SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE = "qpid.systest.virtualhostnode.type"; + static final String SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT = "qpid.systest.virtualhost.blueprint"; + static final String SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION = "qpid.systest.initialConfigurationLocation"; + static final String SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE = "qpid.systest.build.classpath.file"; + static final String SYSTEST_PROPERTY_JAVA8_EXECUTABLE = "qpid.systest.java8.executable"; + static final String SYSTEST_PROPERTY_BROKERJ_DEPENDECIES = "qpid.systest.brokerj.dependencies"; + + private final static AtomicLong BROKER_INSTANCE_COUNTER = new AtomicLong(); + + private volatile List _ports; + private volatile Process _process; + private volatile Integer _pid; + private volatile String _currentWorkDirectory; + private volatile boolean _isPersistentStore; + private volatile String _virtualHostNodeName; + + @Override + public void create(final Class testClass) + { + setClassQualifiedTestName(testClass.getName()); + LOGGER.info("========================= starting broker for test class : {}", testClass.getSimpleName()); + startBroker(testClass); + } + + @Override + public void start(final Class testClass, final Method method) + { + LOGGER.info("========================= prepare test environment for test : {}#{}", + testClass.getSimpleName(), + method.getName()); + String virtualHostNodeName = getVirtualHostNodeName(testClass, method); + createVirtualHost(virtualHostNodeName); + _virtualHostNodeName = virtualHostNodeName; + LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName()); + setClassQualifiedTestName(testClass.getName() + "." + method.getName()); + LOGGER.info("========================= start executing test : {}#{}", + testClass.getSimpleName(), + method.getName()); + } + + + @Override + public void stop(final Class testClass, final Method method) + { + LOGGER.info("========================= stop executing test : {}#{}", + testClass.getSimpleName(), + method.getName()); + setClassQualifiedTestName(testClass.getName()); + LOGGER.info("========================= cleaning up test environment for test : {}#{}", + testClass.getSimpleName(), + method.getName()); + deleteVirtualHost(getVirtualHostNodeName(testClass, method)); + _virtualHostNodeName = null; + LOGGER.info("========================= cleaning done for test : {}#{}", + testClass.getSimpleName(), + method.getName()); + } + + @Override + public void destroy(final Class testClass) + { + LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName()); + shutdown(); + _ports.clear(); + if (Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS)) + { + FileUtils.delete(new File(_currentWorkDirectory), true); + } + _isPersistentStore = false; + LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName()); + setClassQualifiedTestName(null); + } + + @Override + public InetSocketAddress getBrokerAddress(final PortType portType) + { + Integer port = null; + switch (portType) + { + case AMQP: + for (ListeningPort p : _ports) + { + if (p.getProtocol() == null + && (p.getTransport().contains("TCP") /*|| p.getTransport().contains("SSL") */)) + { + port = p.getPort(); + break; + } + } + break; + default: + throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType)); + } + if (port == null) + { + throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType)); + } + return new InetSocketAddress(port); + } + + @Override + public boolean supportsPersistence() + { + return _isPersistentStore; + } + + @Override + public ListenableFuture restart() + { + if (_virtualHostNodeName == null) + { + throw new BrokerAdminException("Virtual host is not started"); + } + return restartVirtualHost(_virtualHostNodeName); + } + + @Override + public String getValidUsername() + { + return "guest"; + } + + @Override + public String getValidPassword() + { + return "guest"; + } + + @Override + public String getType() + { + return SpawnQpidBrokerAdmin.class.getSimpleName(); + } + + @Override + public BrokerType getBrokerType() + { + return BrokerType.BROKERJ; + } + + @Override + public Connection getConnection() throws JMSException + { + return createConnection(_virtualHostNodeName); + } + + private void startBroker(final Class testClass) + { + try + { + start(testClass); + } + catch (Exception e) + { + if (e instanceof RuntimeException) + { + throw (RuntimeException) e; + } + else + { + throw new BrokerAdminException("Unexpected exception on broker startup", e); + } + } + } + + void start(final Class testClass) throws Exception + { + String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis())); + _currentWorkDirectory = + Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())) + .toString(); + + String initialConfiguration = System.getProperty(SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION); + if (initialConfiguration == null) + { + throw new BrokerAdminException( + String.format("No initial configuration is found: JVM property '%s' is not set.", + SYSTEST_PROPERTY_INITIAL_CONFIGURATION_LOCATION)); + } + + File testInitialConfiguration = new File(_currentWorkDirectory, "initial-configuration.json"); + if (!testInitialConfiguration.createNewFile()) + { + throw new BrokerAdminException("Failed to create a file for a copy of initial configuration"); + } + if (initialConfiguration.startsWith("classpath:")) + { + String config = initialConfiguration.substring("classpath:".length()); + try (InputStream is = getClass().getClassLoader().getResourceAsStream(config); + OutputStream os = new FileOutputStream(testInitialConfiguration)) + { + ByteStreams.copy(is, os); + } + } + else + { + Files.copy(new File(initialConfiguration).toPath(), testInitialConfiguration.toPath()); + } + + String classpath; + File file = new File(System.getProperty(SYSTEST_PROPERTY_BUILD_CLASSPATH_FILE)); + if (!file.exists()) + { + String dependencies = System.getProperty(SYSTEST_PROPERTY_BROKERJ_DEPENDECIES); + final ClasspathQuery classpathQuery = new ClasspathQuery(SpawnQpidBrokerAdmin.class, + Arrays.asList(dependencies.split(","))); + classpath = classpathQuery.getClasspath(); + Files.write(file.toPath(), Collections.singleton(classpath), UTF_8); + } + else + { + classpath = new String(Files.readAllBytes(file.toPath()), UTF_8); + } + + // grab Qpid related JVM settings + List jvmArguments = new ArrayList<>(); + Properties jvmProperties = System.getProperties(); + for (String jvmProperty : jvmProperties.stringPropertyNames()) + { + if (jvmProperty.startsWith(SYSTEST_PROPERTY_PREFIX) + || jvmProperty.equalsIgnoreCase("java.io.tmpdir")) + { + jvmArguments.add("-D" + jvmProperty + "=" + jvmProperties.getProperty(jvmProperty)); + } + } + + jvmArguments.add(0, System.getProperty(SYSTEST_PROPERTY_JAVA8_EXECUTABLE, "/usr/bin/java")); + jvmArguments.add(1, "-cp"); + jvmArguments.add(2, classpath); + jvmArguments.add("-Dqpid.systest.logback.socket.port=" + + LogbackSocketPortNumberDefiner.getLogbackSocketPortNumber()); + jvmArguments.add("-Dqpid.systest.logback.logs_dir=" + System.getProperty("qpid.systest.logback.logs_dir", + "${qpid.work_dir}")); + jvmArguments.add(String.format("-Dqpid.systest.logback.origin=%s-%d", + BROKER_LOG_PREFIX, + BROKER_INSTANCE_COUNTER.getAndIncrement())); + jvmArguments.add("-Dqpid.systest.logback.context=" + testClass.getName()); + if (System.getProperty("qpid.systest.remote_debugger") != null) + { + jvmArguments.add(System.getProperty("qpid.systest.remote_debugger")); + } + jvmArguments.add("org.apache.qpid.server.Main"); + jvmArguments.add("-prop"); + jvmArguments.add(String.format("qpid.work_dir=%s", escapePath(_currentWorkDirectory))); + jvmArguments.add("--store-type"); + jvmArguments.add("JSON"); + jvmArguments.add("--initial-config-path"); + jvmArguments.add(escapePath(testInitialConfiguration.toString())); + + LOGGER.debug("Spawning broker JVM :", jvmArguments); + + String ready = System.getProperty(SYSTEST_PROPERTY_BROKER_READY, "BRK-1004 : Qpid Broker Ready"); + String stopped = System.getProperty(SYSTEST_PROPERTY_BROKER_STOPPED, "BRK-1005 : Stopped"); + String amqpListening = System.getProperty(SYSTEST_PROPERTY_BROKER_LISTENING, + "BRK-1002 : Starting( : \\w*)? : Listening on (\\w*) port ([0-9]+)"); + String process = System.getProperty(SYSTEST_PROPERTY_BROKER_PROCESS, "BRK-1017 : Process : PID : ([0-9]+)"); + int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000); + + LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime); + + String[] cmd = jvmArguments.toArray(new String[jvmArguments.size()]); + + ProcessBuilder processBuilder = new ProcessBuilder(cmd); + processBuilder.redirectErrorStream(true); + + Map processEnvironment = processBuilder.environment(); + processEnvironment.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + testClass.getName() + "\""); + + long startTime = System.currentTimeMillis(); + _process = processBuilder.start(); + + BrokerSystemOutpuHandler brokerSystemOutpuHandler = new BrokerSystemOutpuHandler(_process.getInputStream(), + ready, + stopped, + process, + amqpListening, + getClass().getName()); + + boolean brokerStarted = false; + ExecutorService executorService = Executors.newFixedThreadPool(1); + try + { + Future result = executorService.submit(brokerSystemOutpuHandler); + result.get(startUpTime, TimeUnit.MILLISECONDS); + + _pid = brokerSystemOutpuHandler.getPID(); + _ports = brokerSystemOutpuHandler.getAmqpPorts(); + + if (_pid == -1) + { + throw new BrokerAdminException("Broker PID is not detected"); + } + + if (_ports.size() == 0) + { + throw new BrokerAdminException("Broker port is not detected"); + } + + try + { + //test that the broker is still running and hasn't exited unexpectedly + int exit = _process.exitValue(); + LOGGER.info("broker aborted: {}", exit); + throw new BrokerAdminException("broker aborted: " + exit); + } + catch (IllegalThreadStateException e) + { + // this is expect if the broker started successfully + } + + LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}", + System.currentTimeMillis() - startTime, + _pid); + LOGGER.info("Broker ports: {}", _ports); + brokerStarted = true; + } + catch (RuntimeException e) + { + throw e; + } + catch (TimeoutException e) + { + LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'", + startUpTime, brokerSystemOutpuHandler.getReady()); + String threadDump = dumpThreads(); + if (!threadDump.isEmpty()) + { + LOGGER.warn("the result of a try to capture thread dump:" + threadDump); + } + throw new BrokerAdminException(String.format("Broker failed to become ready within %d ms. Stop line : %s", + startUpTime, + brokerSystemOutpuHandler.getStopLine())); + } + catch (ExecutionException e) + { + throw new BrokerAdminException(String.format("Broker startup failed due to %s", e.getCause()), + e.getCause()); + } + catch (Exception e) + { + throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e); + } + finally + { + if (!brokerStarted) + { + LOGGER.warn("Broker failed to start"); + _process.destroy(); + } + executorService.shutdown(); + } + } + + void createVirtualHost(final String virtualHostNodeName) + { + final String nodeType = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOSTNODE_TYPE); + _isPersistentStore = !"Memory".equals(nodeType); + + String storeDir = null; + if (System.getProperty("profile", "").startsWith("java-dby-mem")) + { + storeDir = ":memory:"; + } + else if (!"Memory".equals(nodeType)) + { + storeDir = "${qpid.work_dir}" + File.separator + virtualHostNodeName; + } + + String blueprint = System.getProperty(SYSTEST_PROPERTY_VIRTUALHOST_BLUEPRINT); + LOGGER.debug("Creating Virtual host from blueprint: {}", blueprint); + + Map attributes = new HashMap<>(); + attributes.put("name", virtualHostNodeName); + attributes.put("type", nodeType); + attributes.put("qpid-type", nodeType); + String contextAsString; + try + { + contextAsString = + new ObjectMapper().writeValueAsString(Collections.singletonMap("virtualhostBlueprint", blueprint)); + } + catch (JsonProcessingException e) + { + throw new BrokerAdminException("Cannot create virtual host as context serialization failed", e); + } + attributes.put("context", contextAsString); + attributes.put("defaultVirtualHostNode", true); + attributes.put("virtualHostInitialConfiguration", blueprint); + if (storeDir != null) + { + attributes.put("storePath", storeDir); + } + + try + { + Connection connection = createConnection("$management"); + try + { + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + new AmqpManagementFacade().createEntityUsingAmqpManagement(virtualHostNodeName, + "org.apache.qpid.VirtualHostNode", + attributes, + session); + } + catch (AmqpManagementFacade.OperationUnsuccessfulException e) + { + throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'", + virtualHostNodeName), e); + } + finally + { + session.close(); + } + } + finally + { + connection.close(); + } + } + catch (JMSException e) + { + throw new BrokerAdminException(String.format("Cannot create virtual host '%s'", virtualHostNodeName), e); + } + } + + void deleteVirtualHost(final String virtualHostNodeName) + { + try + { + Connection connection = createConnection("$management"); + try + { + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + new AmqpManagementFacade().deleteEntityUsingAmqpManagement(virtualHostNodeName, + "org.apache.qpid.VirtualHostNode", + session); + } + catch (AmqpManagementFacade.OperationUnsuccessfulException e) + { + throw new BrokerAdminException(String.format("Cannot delete test virtual host '%s'", + virtualHostNodeName), e); + } + finally + { + session.close(); + } + } + finally + { + connection.close(); + } + } + catch (JMSException e) + { + throw new BrokerAdminException(String.format("Cannot delete virtual host '%s'", virtualHostNodeName), e); + } + } + + ListenableFuture restartVirtualHost(final String virtualHostNodeName) + { + try + { + Connection connection = createConnection("$management"); + try + { + connection.start(); + updateVirtualHostNode(virtualHostNodeName, + Collections.singletonMap("desiredState", "STOPPED"), connection); + updateVirtualHostNode(virtualHostNodeName, + Collections.singletonMap("desiredState", "ACTIVE"), connection); + return Futures.immediateFuture(null); + } + finally + { + connection.close(); + } + } + catch (JMSException e) + { + throw new BrokerAdminException(String.format("Cannot create virtual host %s", virtualHostNodeName), e); + } + } + + private void updateVirtualHostNode(final String virtualHostNodeName, + final Map attributes, + final Connection connection) throws JMSException + { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + new AmqpManagementFacade().updateEntityUsingAmqpManagement(virtualHostNodeName, + "org.apache.qpid.VirtualHostNode", + attributes, + session); + } + catch (AmqpManagementFacade.OperationUnsuccessfulException e) + { + throw new BrokerAdminException(String.format("Cannot create test virtual host '%s'", virtualHostNodeName), + e); + } + finally + { + session.close(); + } + } + + void shutdown() + { + if (SystemUtils.isWindows()) + { + doWindowsKill(); + } + + if (_process != null) + { + LOGGER.info("Destroying broker process"); + _process.destroy(); + + reapChildProcess(); + } + } + + private String escapePath(String value) + { + if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\"")) + { + return "\"" + value.replaceAll("\"", "\"\"") + "\""; + } + else + { + return value; + } + } + + private Connection createConnection(String virtualHostName) throws JMSException + { + final Hashtable initialContextEnvironment = new Hashtable<>(); + initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, + "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"); + final String factoryName = "connectionFactory"; + String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''"; + String url = String.format(urlTemplate, + "spawn_broker_admin", + virtualHostName, + getBrokerAddress(PortType.AMQP).getPort()); + initialContextEnvironment.put("connectionfactory." + factoryName, url); + try + { + InitialContext initialContext = new InitialContext(initialContextEnvironment); + try + { + ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName); + return factory.createConnection(getValidUsername(), getValidPassword()); + } + finally + { + initialContext.close(); + } + } + catch (NamingException e) + { + throw new BrokerAdminException("Unexpected exception on connection lookup", e); + } + } + + private void setClassQualifiedTestName(final String name) + { + final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext(); + loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, name); + } + + + private String getVirtualHostNodeName(final Class testClass, final Method method) + { + return testClass.getSimpleName() + "_" + method.getName(); + } + + + private void doWindowsKill() + { + try + { + + Process p; + p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"}); + consumeAllOutput(p); + } + catch (IOException e) + { + LOGGER.error("Error whilst killing process " + _pid, e); + } + } + + private static void consumeAllOutput(Process p) throws IOException + { + try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream())) + { + try (BufferedReader reader = new BufferedReader(inputStreamReader)) + { + String line; + while ((line = reader.readLine()) != null) + { + LOGGER.debug("Consuming output: {}", line); + } + } + } + } + + private void reapChildProcess() + { + try + { + _process.waitFor(); + LOGGER.info("broker exited: " + _process.exitValue()); + } + catch (InterruptedException e) + { + LOGGER.error("Interrupted whilst waiting for process shutdown"); + Thread.currentThread().interrupt(); + } + finally + { + try + { + _process.getInputStream().close(); + _process.getErrorStream().close(); + _process.getOutputStream().close(); + } + catch (IOException ignored) + { + } + } + } + + private String dumpThreads() + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try + { + Process process = Runtime.getRuntime().exec("jstack " + _pid); + InputStream is = process.getInputStream(); + byte[] buffer = new byte[1024]; + int length; + while ((length = is.read(buffer)) != -1) + { + baos.write(buffer, 0, length); + } + } + catch (Exception e) + { + LOGGER.error("Error whilst collecting thread dump for " + _pid, e); + } + return new String(baos.toByteArray()); + } + + + public final class BrokerSystemOutpuHandler implements Runnable + { + + private final BufferedReader _in; + private final Logger _out; + private final String _ready; + private final String _stopped; + private final List _amqpPorts; + private final Pattern _pidPattern; + private final Pattern _amqpPortPattern; + private volatile boolean _seenReady; + private volatile String _stopLine; + private volatile int _pid; + + private BrokerSystemOutpuHandler(InputStream in, + String ready, + String stopped, + String pidRegExp, + String amqpPortRegExp, + String loggerName) + { + _amqpPorts = new ArrayList<>(); + _in = new BufferedReader(new InputStreamReader(in)); + _out = LoggerFactory.getLogger(loggerName); + _ready = ready; + _stopped = stopped; + _seenReady = false; + _amqpPortPattern = Pattern.compile(amqpPortRegExp); + _pidPattern = Pattern.compile(pidRegExp); + } + + @Override + public void run() + { + try + { + String line; + while ((line = _in.readLine()) != null) + { + _out.info(line); + + checkPortListeningLog(line, _amqpPortPattern, _amqpPorts); + + Matcher pidMatcher = _pidPattern.matcher(line); + if (pidMatcher.find()) + { + if (pidMatcher.groupCount() > 1) + { + _pid = Integer.parseInt(pidMatcher.group(1)); + } + } + + if (line.contains(_ready)) + { + _seenReady = true; + break; + } + + if (!_seenReady && line.contains(_stopped)) + { + _stopLine = line; + } + } + } + catch (IOException e) + { + LOGGER.warn(e.getMessage() + + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost."); + } + } + + private void checkPortListeningLog(final String line, + final Pattern portPattern, + final List ports) + { + Matcher portMatcher = portPattern.matcher(line); + if (portMatcher.find()) + { + ports.add(new ListeningPort(portMatcher.group(1), + portMatcher.group(2), + Integer.parseInt(portMatcher.group(3)))); + } + } + + String getStopLine() + { + return _stopLine; + } + + String getReady() + { + return _ready; + } + + int getPID() + { + return _pid; + } + + List getAmqpPorts() + { + return _amqpPorts; + } + } + + private static class ListeningPort + { + private String _protocol; + private String _transport; + private int _port; + + ListeningPort(final String protocol, final String transport, final int port) + { + _transport = transport; + _port = port; + _protocol = protocol; + } + + String getTransport() + { + return _transport; + } + + int getPort() + { + return _port; + } + + String getProtocol() + { + return _protocol; + } + + @Override + public String toString() + { + return "ListeningPort{" + + "_protocol='" + _protocol + '\'' + + ", _transport='" + _transport + '\'' + + ", _port=" + _port + + '}'; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org