Just to confirm: when your subscribers are all offline, how many objects
are returned in the List from getSubscriptions()?
Tim
On Wed, Oct 18, 2017 at 10:18 AM, Hitesh <
hitesh.hotchandani@contentsphere.com> wrote:
>
>
> Hi all,
> I have a very peculiar and weird usecase with ActiveMQ 5.13
>
> I have two durable subscribers listening on a *topic*. When a particular
> event occurs, I have to invalidate the messages and remove them from the
> topic. To do this, I'm using JMX connection and browsing on the topic and
> then from each subscriber I'm removing the message. If the subscriber is
> Active, I'm closing the activeMQ connection, which is a
> org.apache.activemq.pool.PooledConnectionFactory.createConnection().
>
> In this scenario, I'm able to clear the pending message and also the space
> on Kahadb reduces. Expected behaviour.
>
> But If I keep my subscribers inactive, the message count remains the same
> and also the kahadb disk usage.
>
> Following is the code I use to cleanup the messages:
> public void startCleanup() throws ExportStagingException {
> try {
> if (topicViewMBean == null) {
> createTopicViewMBean();
> }
> for (String jmsMessageID :
> getMessageIDs(topicViewMBean.browse())) {
> messagesCount++;
> for (DurableSubscriptionViewMBean subscriber :
> getSubscriptions(topicViewMBean.getSubscriptions(), connection)) {
> subscriber.removeMessage(jmsMessageID);
> }
> }
> if (topicViewMBean.browse().length > 0) {
> startCleanup();
> }
> if (messagesCount == 0) {
> logger.info("No messages to delete on Topic: " +
> topicName +
> "_" + projectName);
> } else {
> logger.info("Successfully deleted " + messagesCount + "
> messages from " + topicName + "_" + projectName);
> messagesCount = 0;
> }
> } catch (Exception e) {
> throw new ExportStagingException(e);
> }
> }
>
> Following function fetches the message from the topic and maintains a list
> of their JMSMessageID
>
> private Set<String> getMessageIDs(CompositeData[] messages) {
> Set<String> jmsMessageIDs = new HashSet<>();
> for (CompositeData message : messages) {
> jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID));
> }
> return jmsMessageIDs;
> }
>
> And this one to fetch the subscription list
>
> private List<DurableSubscriptionViewMBean>
> getSubscriptions(ObjectName[]
> subscriptionNames, MBeanServerConnection conn) {
> if (subscriptions == null) {
> subscriptions = new ArrayList<>();
> for (ObjectName subscriptionName : subscriptionNames) {
> //Creates Subscriber Object and caches it.
> subscriptions.add(
> MBeanServerInvocationHandler.newProxyInstance(
> conn,
> subscriptionName,
> DurableSubscriptionViewMBean.class,
> true
> )
> );
> }
> }
> return subscriptions;
> }
>
>
> And last but not the least, The create connection and topic
>
> private MBeanServerConnection createConnection() {
> try {
> if (connection == null) {
> logger.info("Connecting to ActiveMQ JMX Portal");
> String jmxURL = "service:jmx:rmi:///jndi/rmi://" +
> hostName
> + ":" + jmxPort + "/jmxrmi";
> connection = JMXConnectorFactory.connect(new
> JMXServiceURL(jmxURL)).getMBeanServerConnection();
> logger.info("Connected to ActiveMQ JMX Portal");
> }
> } catch (IOException e) {
> logger.error("[" + masterProducer + "] Exception while
> createConnection ActiveMQ JMX Portal" + e.getMessage());
> }
> return connection;
> }
>
> private TopicViewMBean createTopicViewMBean() throws
> ExportStagingException {
> try {
> if (topicViewMBean == null) {
> if (connection == null) {
> createConnection();
> }
> String brokerObjectName =
> "org.apache.activemq:type=Broker,brokerName=" + brokerName;
> BrokerViewMBean broker;
> broker =
> MBeanServerInvocationHandler.newProxyInstance(connection, new
> ObjectName(brokerObjectName),
> BrokerViewMBean.class, true);
> //The following for-loop fetches info about the topics
> available on the Broker.
> boolean topicExists = false;
> for (ObjectName topic : broker.getTopics()) {
> if
> (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" +
> projectName)) {
> topicExists = true;
> topicViewMBean =
> MBeanServerInvocationHandler.newProxyInstance(connection, topic,
> TopicViewMBean.class, true);
> break;
> }
> }
> if (!topicExists) {
> logger.info("Topic " + topicName + " does not
> exists");
> }
> }
> } catch (MalformedObjectNameException e) {
> logger.error("MalformedObjectNameException while
> createTopicViewMBean: " + e.getMessage());
> throw new ExportStagingException(e);
> }
> return topicViewMBean;
> }
>
>
> I've gone through the code to-and fro trying to find the missing link, but
> all in vain.
> Can anyone help me out over here...
>
> Thanks :)
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>
|