Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6EE83200D23 for ; Thu, 19 Oct 2017 15:17:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6D7081609ED; Thu, 19 Oct 2017 13:17:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8BE731609E2 for ; Thu, 19 Oct 2017 15:17:40 +0200 (CEST) Received: (qmail 37099 invoked by uid 500); 19 Oct 2017 13:17:39 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 37087 invoked by uid 99); 19 Oct 2017 13:17:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Oct 2017 13:17:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 758E11807DC for ; Thu, 19 Oct 2017 13:17:38 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.795 X-Spam-Level: **** X-Spam-Status: No, score=4.795 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, KAM_INFOUSMEBIZ=0.75, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, URI_HEX=1.313, URI_TRY_3LD=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 1bDOVuuhnHuO for ; Thu, 19 Oct 2017 13:17:36 +0000 (UTC) Received: from mail-it0-f45.google.com (mail-it0-f45.google.com [209.85.214.45]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id A1FD65FBE5 for ; Thu, 19 Oct 2017 13:17:35 +0000 (UTC) Received: by mail-it0-f45.google.com with SMTP id o135so9694089itb.0 for ; Thu, 19 Oct 2017 06:17:35 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:sender:in-reply-to:references:from:date:message-id :subject:to; bh=O2y9n6cYYeAcXcOcl9rNIC4xjDSzg74xMv/YjumOFSM=; b=bemtQ3IOmcfJBFdGQ5aeBz40Qodt/WguVfh8/pZLj/YMIQY910k0m7Icl54IOiCb9E +ISnEE12KiB/vnHjMr8MvIqEbbvothCYssOBcgUG5eJttxeRHpV5znVhckG3Nb+oBW3m su68mjcMn9HtwLeukxN2HqfB5NbksnXnzdzHBDQBNwkrvMlbIwHios+B2nEEs7HH9XnI EFpBLs6fliGtkMWVmK5KksjwNiCOVxdn/K/pp617y1VvkRQE5ZTUhG/fPJCRmetzLJKm LnBhSTX4bZo3gGk/pjiZjo77HIBHWnsoMGXCkfbqYTRrdKlU4XzJp9XC41LfN3tWhbO8 zawg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:sender:in-reply-to:references:from :date:message-id:subject:to; bh=O2y9n6cYYeAcXcOcl9rNIC4xjDSzg74xMv/YjumOFSM=; b=m1EdcVZho8z95nHxsVux4w22JOlyK7NynZw4fv5jfX5V/dvfVallIramK0th7FiBqq 3Z42pbM8pFyalpT4AAlAYOny36x5laOXUcXba2LpaCRZ6j5DbJtaGlBfMyd5QT8+mpI7 zAAHAdAd2pTugG5c+68Z3zj2qEdEgijF8YkQBXt121yeJIN7+IjI5jdc8rYI3S0xS7pY KKXeS08IhcVSqlBt6WkE7ZE1fRTVhDGxP8iHjIs7CBDNNIewLmNudJvhueMj7ntuO/ze fY+Dy82ww1z17oJcb09+MQSyPTr3Yv3R/pqYVllPAGbQSs6mb/fbHtIUW9RYOIjRo3j2 SZKA== X-Gm-Message-State: AMCzsaVwU//3bP+3rI841kgfXsH0e5rCcsdLC3Ba8M10Zwfdf7cbJabV Hc2VXZMrpGwH1W01vfjLXFwy9Fj+JJ3OB/uMGtMjdQ== X-Google-Smtp-Source: ABhQp+Qpv2XiabDH131ryZoFhxS1N6hFIyyJiawfiAasQWLhWaV3Iz5kbW2sHfBlgild6B3fuiIL98xWqB2ZA8LgxjE= X-Received: by 10.36.228.206 with SMTP id o197mr834506ith.5.1508419054745; Thu, 19 Oct 2017 06:17:34 -0700 (PDT) MIME-Version: 1.0 Sender: tbain98@gmail.com Received: by 10.2.102.30 with HTTP; Thu, 19 Oct 2017 06:17:14 -0700 (PDT) In-Reply-To: <1508343524159-0.post@n4.nabble.com> References: <1508343524159-0.post@n4.nabble.com> From: Tim Bain Date: Thu, 19 Oct 2017 07:17:14 -0600 X-Google-Sender-Auth: uWwlguTfrDfckvfkvNy-AxHLTbs Message-ID: Subject: Re: Cleanup Pending messages using JMX remove message To: ActiveMQ Users Content-Type: multipart/alternative; boundary="94eb2c04893aee3f57055be62d82" archived-at: Thu, 19 Oct 2017 13:17:41 -0000 --94eb2c04893aee3f57055be62d82 Content-Type: text/plain; charset="UTF-8" Just to confirm: when your subscribers are all offline, how many objects are returned in the List from getSubscriptions()? Tim On Wed, Oct 18, 2017 at 10:18 AM, Hitesh < hitesh.hotchandani@contentsphere.com> wrote: > > > Hi all, > I have a very peculiar and weird usecase with ActiveMQ 5.13 > > I have two durable subscribers listening on a *topic*. When a particular > event occurs, I have to invalidate the messages and remove them from the > topic. To do this, I'm using JMX connection and browsing on the topic and > then from each subscriber I'm removing the message. If the subscriber is > Active, I'm closing the activeMQ connection, which is a > org.apache.activemq.pool.PooledConnectionFactory.createConnection(). > > In this scenario, I'm able to clear the pending message and also the space > on Kahadb reduces. Expected behaviour. > > But If I keep my subscribers inactive, the message count remains the same > and also the kahadb disk usage. > > Following is the code I use to cleanup the messages: > public void startCleanup() throws ExportStagingException { > try { > if (topicViewMBean == null) { > createTopicViewMBean(); > } > for (String jmsMessageID : > getMessageIDs(topicViewMBean.browse())) { > messagesCount++; > for (DurableSubscriptionViewMBean subscriber : > getSubscriptions(topicViewMBean.getSubscriptions(), connection)) { > subscriber.removeMessage(jmsMessageID); > } > } > if (topicViewMBean.browse().length > 0) { > startCleanup(); > } > if (messagesCount == 0) { > logger.info("No messages to delete on Topic: " + > topicName + > "_" + projectName); > } else { > logger.info("Successfully deleted " + messagesCount + " > messages from " + topicName + "_" + projectName); > messagesCount = 0; > } > } catch (Exception e) { > throw new ExportStagingException(e); > } > } > > Following function fetches the message from the topic and maintains a list > of their JMSMessageID > > private Set getMessageIDs(CompositeData[] messages) { > Set jmsMessageIDs = new HashSet<>(); > for (CompositeData message : messages) { > jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID)); > } > return jmsMessageIDs; > } > > And this one to fetch the subscription list > > private List > getSubscriptions(ObjectName[] > subscriptionNames, MBeanServerConnection conn) { > if (subscriptions == null) { > subscriptions = new ArrayList<>(); > for (ObjectName subscriptionName : subscriptionNames) { > //Creates Subscriber Object and caches it. > subscriptions.add( > MBeanServerInvocationHandler.newProxyInstance( > conn, > subscriptionName, > DurableSubscriptionViewMBean.class, > true > ) > ); > } > } > return subscriptions; > } > > > And last but not the least, The create connection and topic > > private MBeanServerConnection createConnection() { > try { > if (connection == null) { > logger.info("Connecting to ActiveMQ JMX Portal"); > String jmxURL = "service:jmx:rmi:///jndi/rmi://" + > hostName > + ":" + jmxPort + "/jmxrmi"; > connection = JMXConnectorFactory.connect(new > JMXServiceURL(jmxURL)).getMBeanServerConnection(); > logger.info("Connected to ActiveMQ JMX Portal"); > } > } catch (IOException e) { > logger.error("[" + masterProducer + "] Exception while > createConnection ActiveMQ JMX Portal" + e.getMessage()); > } > return connection; > } > > private TopicViewMBean createTopicViewMBean() throws > ExportStagingException { > try { > if (topicViewMBean == null) { > if (connection == null) { > createConnection(); > } > String brokerObjectName = > "org.apache.activemq:type=Broker,brokerName=" + brokerName; > BrokerViewMBean broker; > broker = > MBeanServerInvocationHandler.newProxyInstance(connection, new > ObjectName(brokerObjectName), > BrokerViewMBean.class, true); > //The following for-loop fetches info about the topics > available on the Broker. > boolean topicExists = false; > for (ObjectName topic : broker.getTopics()) { > if > (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" + > projectName)) { > topicExists = true; > topicViewMBean = > MBeanServerInvocationHandler.newProxyInstance(connection, topic, > TopicViewMBean.class, true); > break; > } > } > if (!topicExists) { > logger.info("Topic " + topicName + " does not > exists"); > } > } > } catch (MalformedObjectNameException e) { > logger.error("MalformedObjectNameException while > createTopicViewMBean: " + e.getMessage()); > throw new ExportStagingException(e); > } > return topicViewMBean; > } > > > I've gone through the code to-and fro trying to find the missing link, but > all in vain. > Can anyone help me out over here... > > Thanks :) > > > > > > -- > Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User- > f2341805.html > --94eb2c04893aee3f57055be62d82--