From commits-return-50818-archive-asf-public=cust-asf.ponee.io@activemq.apache.org Wed Mar 14 23:40:30 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 18ACF18067B for ; Wed, 14 Mar 2018 23:40:28 +0100 (CET) Received: (qmail 47022 invoked by uid 500); 14 Mar 2018 22:40:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 47013 invoked by uid 99); 14 Mar 2018 22:40:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Mar 2018 22:40:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B8203F6611; Wed, 14 Mar 2018 22:40:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Wed, 14 Mar 2018 22:40:28 -0000 Message-Id: <55953813b5ef484190dd6418f074bd29@git.apache.org> In-Reply-To: <3bc4510fa58e4ea7a0e36ff00329ca3a@git.apache.org> References: <3bc4510fa58e4ea7a0e36ff00329ca3a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] activemq-artemis git commit: ARTEMIS-1286 Adding test replicating MQTT direct buffer leak ARTEMIS-1286 Adding test replicating MQTT direct buffer leak (Test developed as an interaction between Justin Bertram, Philip Jenkins and Clebert Suconic through ARTEMIS-1286) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0d47f627 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0d47f627 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0d47f627 Branch: refs/heads/master Commit: 0d47f62710ecd7e7513987dea289d362d3dc229f Parents: 2383c22 Author: Clebert Suconic Authored: Wed Mar 14 17:53:29 2018 -0400 Committer: Clebert Suconic Committed: Wed Mar 14 18:39:58 2018 -0400 ---------------------------------------------------------------------- tests/smoke-tests/pom.xml | 25 +++ .../src/main/resources/servers/mqtt/broker.xml | 199 +++++++++++++++++++ .../artemis/tests/smoke/mqtt/MQTTLeakTest.java | 193 ++++++++++++++++++ 3 files changed, 417 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0d47f627/tests/smoke-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 90fdbcb..9d5dff0 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -103,6 +103,15 @@ org.jboss.logmanager jboss-logmanager + + org.fusesource.mqtt-client + mqtt-client + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + RELEASE + @@ -153,6 +162,22 @@ ${basedir}/target/expire + + test-compile + create-mqtt + + create + + + + ${basedir}/target/classes/servers/mqtt + true + admin + admin + ${basedir}/target/mqtt + ${basedir}/target/classes/servers/mqtt + + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0d47f627/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml ---------------------------------------------------------------------- diff --git a/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml new file mode 100644 index 0000000..c318037 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mqtt/broker.xml @@ -0,0 +1,199 @@ + + + + + + + + 0.0.0.0 + + true + + + NIO + + ./data/paging + + ./data/bindings + + ./data/journal + + ./data/large-messages + + true + + 2 + + -1 + + 10485760 + + + + + + + + + + + + + + + + + + + + + 6488000 + + + + 5000 + + + 90 + + + 100Mb + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300 + + + tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true + + + tcp://0.0.0.0:5445?protocols=HORNETQ,STOMP;useEpoll=true + + + tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + + 1M + 50000 + 10 + PAGE + true + true + true + true + + + + DLQ + ExpiryQueue + 0 + + + 50000 + 10 + PAGE + true + true + true + true + + + + +
+ + + +
+
+ + + +
+ +
+ +
+
\ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0d47f627/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java ---------------------------------------------------------------------- diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java new file mode 100644 index 0000000..4ac374a --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/mqtt/MQTTLeakTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.smoke.mqtt; + +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.Semaphore; + +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.util.ServerUtil; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class MQTTLeakTest extends SmokeTestBase { + + public static final String SERVER_NAME_0 = "mqtt"; + + private static Process server0; + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + disableCheckThread(); + } + + @After + @Override + public void after() throws Exception { + super.after(); + cleanupData(SERVER_NAME_0); + } + + @Test + public void testMQTTLeak() throws Throwable { + + try { + server0 = startServer(SERVER_NAME_0, 0, 30000); + MQTTRunner.run(); + } finally { + + ServerUtil.killServer(server0); + } + } + + + + private static class MQTTRunner implements MqttCallback { + + private MqttAsyncClient mqttClient; + private MqttConnectOptions connOpts; + protected static MQTTRunner publisherClient; + protected static MQTTRunner consumerClient; + + private static String topicPaho1 = "State/PRN/"; + private static String topicPaho2 = "Soap/PRN/"; + public String name; + + private static final Semaphore semaphore = new Semaphore(2); + + public static void run() throws Exception { + publisherClient = new MQTTRunner(); + publisherClient.connect(); + publisherClient.name = "Pub"; + consumerClient = new MQTTRunner(); + consumerClient.connect(); + consumerClient.name = "Consumer"; + byte[] content = buildContent(); + + for (int idx = 0; idx < 500; idx++) { + if (idx % 100 == 0) { + System.out.println("Sent " + idx + " messages"); + } + MqttMessage msg = new MqttMessage(content); + semaphore.acquire(2); + publisherClient.mqttClient.publish(topicPaho1, msg); + } + } + + public void connect() { + // create a new Paho MqttClient + MemoryPersistence persistence = new MemoryPersistence(); + // establish the client ID for the life of this DPI publisherClient + String clientId = UUID.randomUUID().toString(); + try { + mqttClient = new MqttAsyncClient("tcp://localhost:1883", clientId, persistence); + // Create a set of connection options + connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + mqttClient.connect(connOpts); + } catch (MqttException e) { + e.printStackTrace(); + } + // pause a moment to get connected (prevents the race condition) + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // subscribe + try { + String[] topicsPaho = new String[]{topicPaho1, topicPaho2}; + int[] qos = new int[]{0, 0}; + mqttClient.subscribe(topicsPaho, qos); + } catch (MqttException e) { + e.printStackTrace(); + } + + mqttClient.setCallback(this); + } + + @Override + public void connectionLost(Throwable throwable) { + } + + int count = 0; + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + + count++; + + if (count % 100 == 0) { + System.out.println("Received " + count); + } + + semaphore.release(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + + public static byte[] buildContent() { + + ArrayList stringval2 = buildContentArray(); + int size = 0; + for (String value : stringval2) { + size += value.length(); + } + System.out.println(); + StringBuilder builder = new StringBuilder(size); + for (String value : stringval2) { + builder.append(value); + } + String msgContent = builder.toString(); + + return msgContent.getBytes(); + } + + public static ArrayList buildContentArray() { + ArrayList val = new ArrayList<>(); + String msgHdr = ""; + String msgChunk = "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiIHN0YW5kYWxvbmU9InllcyI/Pgo8bnMyOlRyYW5zZmVyIHhtbG5zOm5zMj0idXJuOmRwY2w6dXBkYXRlOjIwMTEtMTAtMTkiPgogICAgPGltYWdlU2VnbWVudD4KICAgICAgICA8Ym9hcmQ+MjU5PC9ib2FyZD4KICAgICAgICA8Y2F0ZWdvcnk+MjwvY2F0ZWdvcnk+CiAgICAgICAgPHZlcnNpb24+Mjg1NDA5Mjg1PC92ZXJzaW9uPgogICAgICAgIDxpZD4yNjwvaWQ+CiAgICAgICAgPHNpemU+MjA5NzE1Mjwvc2l6ZT4KICAgICAgICA8Y2hlY2tzdW0+NTE0ODI3MGJmZTM2ZmYzNmIyZTNmMjc0NWJlNmYyMGY8L2NoZWNrc3VtPgogICAgICAgIDxkYXRhPm5OQUJ1WHQvWG0xYlhGeC9aallZbEJ1K2NrWU1ncHBTMnZpTVZoOUxjTENjTFlTL1Z6YUxlSWNnWmtlMjI5Z1dlS1p6czlSclBrdVlsSHYvaWNlSldJeTUxaGFpVUx3NTY0NWtTTUlhMEhjNnZoYTB5UC91OEVNUEcvck9LL1JhVXpuS0tRdXF5WVNDVlZ3TWROS25IWjZ5Sm91TkdMcVJ3a0MvVDZUdStrTWxKak9TcjV6MUNYWDdtZWdvSGpLdkFuU1AyOFJWY0F3MWVXTUtIY0pQU0Z0bFZXSkFYVXErZjFzbE9HWXlNSGhiN2haV0VnMWc4TlRlVUJ2NHJGL0RtUitKRjRmbjlWdkRJSkJYanJpeE5CNWFyc1RKOTR3dEF2YWxVM28vVzVnODltbURNNHp0VlVuaHZvSlRTSlZ6bXlqTGpJMWQ5OExVVTVWU3dqWE5KMjZ2d0F4R1ptVmwrVGlMU0JaeWNYak45NlYxVUZ6eldOMStPN 2h5SHRMZnMvOE9kRjVMK1ArbjZXOXNqNVA3aDdGZUU4UFVHbGpLcXhxWmFGbFZ4aXJPRjYrUExGTHFFMzAzUzVodzJPeDFBQjA5Sjl4VThjVXNtUVI0dlJBS3B0Y3ZpbXkzb1VncmxWQTBwNG83cFdlYkduak1kT1N6ZGR2M01uMi9rMldlOVRHNzI3OEhkdTdLQlNtVW95VTJSM0l6TitITXhXeGQ4"; + + val.add(msgHdr); + for (int idx = 0; idx < 300; idx++) { + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + val.add(msgChunk); + } + return val; + } + } +}