activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1348270 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ test/java/org/apache/activemq/broker/jmx/
Date Fri, 08 Jun 2012 22:56:35 GMT
Author: tabish
Date: Fri Jun  8 22:56:35 2012
New Revision: 1348270

URL: http://svn.apache.org/viewvc?rev=1348270&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3877

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
  (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1348270&r1=1348269&r2=1348270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Jun  8 22:56:35 2012
@@ -132,6 +132,7 @@ public class BrokerService implements Se
     private boolean populateJMSXUserID;
     private boolean useAuthenticatedPrincipalForJMSXUserID;
     private boolean populateUserNameInMBeans;
+    private long mbeanInvocationTimeout = 0;
 
     private boolean useShutdownHook = true;
     private boolean useLoggingForShutdownErrors;
@@ -2648,6 +2649,27 @@ public class BrokerService implements Se
         this.populateUserNameInMBeans = value;
     }
 
+    /**
+     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
+     * failing.  The default value is to wait forever (zero).
+     *
+     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
+     */
+    public long getMbeanInvocationTimeout() {
+        return mbeanInvocationTimeout;
+    }
+
+    /**
+     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
+     * failing. The default value is to wait forever (zero).
+     *
+     * @param mbeanInvocationTimeout
+     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
+     */
+    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
+        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
+    }
+
     public boolean isNetworkConnectorStartAsync() {
         return networkConnectorStartAsync;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java?rev=1348270&r1=1348269&r2=1348270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AnnotatedMBean.java
Fri Jun  8 22:56:35 2012
@@ -16,155 +16,164 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import org.apache.activemq.broker.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.security.AccessController;
 import java.security.Principal;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.management.*;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanOperationInfo;
+import javax.management.MBeanParameterInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.StandardMBean;
 import javax.security.auth.Subject;
 
+import org.apache.activemq.broker.util.AuditLogEntry;
+import org.apache.activemq.broker.util.AuditLogService;
+import org.apache.activemq.broker.util.JMXAuditLogEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * MBean that looks for method/parameter descriptions in the Info annotation.
  */
 public class AnnotatedMBean extends StandardMBean {
 
-  private static final Map<String, Class<?>> primitives = new HashMap<String,
Class<?>>();
+    private static final Map<String, Class<?>> primitives = new HashMap<String,
Class<?>>();
+
+    private static final Logger LOG = LoggerFactory.getLogger("org.apache.activemq.audit");
+
+    private static boolean audit;
+    private static AuditLogService auditLog;
+
+    static {
+        Class<?>[] p = { byte.class, short.class, int.class, long.class, float.class,
double.class, char.class, boolean.class, };
+        for (Class<?> c : p) {
+            primitives.put(c.getName(), c);
+        }
+        audit = "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.audit"));
+        if (audit) {
+            auditLog = AuditLogService.getAuditLog();
+        }
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static void registerMBean(ManagementContext context, Object object, ObjectName
objectName) throws Exception {
+
+        String mbeanName = object.getClass().getName() + "MBean";
 
-  private static final Logger LOG = LoggerFactory.getLogger("org.apache.activemq.audit");
+        for (Class c : object.getClass().getInterfaces()) {
+            if (mbeanName.equals(c.getName())) {
+                context.registerMBean(new AnnotatedMBean(object, c), objectName);
+                return;
+            }
+        }
 
-  private static boolean audit;
-  private static AuditLogService auditLog;
+        context.registerMBean(object, objectName);
+    }
+
+    /** Instance where the MBean interface is implemented by another object. */
+    public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException
{
+        super(impl, mbeanInterface);
+    }
 
-  static {
-    Class<?>[] p = { byte.class, short.class, int.class, long.class, float.class, double.class,
char.class, boolean.class, };
-    for (Class<?> c : p) {
-      primitives.put(c.getName(), c);
-    }
-    audit = "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.audit"));
-    if (audit) {
-        auditLog = AuditLogService.getAuditLog();
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  public static void registerMBean(ManagementContext context, Object object, ObjectName objectName)

-    throws Exception {
-
-    String mbeanName = object.getClass().getName() + "MBean";
-    
-    for (Class c : object.getClass().getInterfaces()) {
-      if (mbeanName.equals(c.getName())) {
-        context.registerMBean(new AnnotatedMBean(object, c), objectName);
-        return;
-      }
-    }
-
-    context.registerMBean(object, objectName);
-  }
-  
-  /** Instance where the MBean interface is implemented by another object. */
-  public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException
{
-    super(impl, mbeanInterface);
-  }
-
-  /** Instance where the MBean interface is implemented by this object. */
-  protected AnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException
{
-    super(mbeanInterface);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  protected String getDescription(MBeanAttributeInfo info) {
-
-    String descr = info.getDescription();
-    Method m = getMethod(getMBeanInterface(), "get"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1));
-    if (m == null)
-      m = getMethod(getMBeanInterface(), "is"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1));
-    if (m == null)
-      m = getMethod(getMBeanInterface(), "does"+info.getName().substring(0, 1).toUpperCase()+info.getName().substring(1));
-      
-    if (m != null) {
-      MBeanInfo d = m.getAnnotation(MBeanInfo.class);
-      if (d != null)
-        descr = d.value();
-    }
-    return descr;
-  }
-  
-  /** {@inheritDoc} */
-  @Override
-  protected String getDescription(MBeanOperationInfo op) {
-
-    String descr = op.getDescription();
-    Method m = getMethod(op);
-    if (m != null) {
-      MBeanInfo d = m.getAnnotation(MBeanInfo.class);
-      if (d != null)
-        descr = d.value();
-    }
-    return descr;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int
paramNo) {
-    String name = param.getName();
-    Method m = getMethod(op);
-    if (m != null) {
-      for (Annotation a : m.getParameterAnnotations()[paramNo]) {
-        if (MBeanInfo.class.isInstance(a))
-          name = MBeanInfo.class.cast(a).value();
-      }
-    }
-    return name;
-  }
-
-  /**
-   * Extracts the Method from the MBeanOperationInfo
-   * @param op
-   * @return
-   */
-  private Method getMethod(MBeanOperationInfo op) {
-    final MBeanParameterInfo[] params = op.getSignature();
-    final String[] paramTypes = new String[params.length];
-    for (int i = 0; i < params.length; i++)
-      paramTypes[i] = params[i].getType();
-
-    return getMethod(getMBeanInterface(), op.getName(), paramTypes);
-  }
-
-  /**
-   * Returns the Method with the specified name and parameter types for the given class,
-   * null if it doesn't exist.
-   * @param mbean
-   * @param method
-   * @param params
-   * @return
-   */
-  private static Method getMethod(Class<?> mbean, String method, String... params)
{
-    try {
-      final ClassLoader loader = mbean.getClassLoader();
-      final Class<?>[] paramClasses = new Class<?>[params.length];
-      for (int i = 0; i < params.length; i++) {
-        paramClasses[i] = primitives.get(params[i]);
-        if (paramClasses[i] == null)
-          paramClasses[i] = Class.forName(params[i], false, loader);
-      }
-      return mbean.getMethod(method, paramClasses);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (Exception e) {
-      return null;
+    /** Instance where the MBean interface is implemented by this object. */
+    protected AnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException
{
+        super(mbeanInterface);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected String getDescription(MBeanAttributeInfo info) {
+
+        String descr = info.getDescription();
+        Method m = getMethod(getMBeanInterface(), "get" + info.getName().substring(0, 1).toUpperCase()
+ info.getName().substring(1));
+        if (m == null)
+            m = getMethod(getMBeanInterface(), "is" + info.getName().substring(0, 1).toUpperCase()
+ info.getName().substring(1));
+        if (m == null)
+            m = getMethod(getMBeanInterface(), "does" + info.getName().substring(0, 1).toUpperCase()
+ info.getName().substring(1));
+
+        if (m != null) {
+            MBeanInfo d = m.getAnnotation(MBeanInfo.class);
+            if (d != null)
+                descr = d.value();
+        }
+        return descr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected String getDescription(MBeanOperationInfo op) {
+
+        String descr = op.getDescription();
+        Method m = getMethod(op);
+        if (m != null) {
+            MBeanInfo d = m.getAnnotation(MBeanInfo.class);
+            if (d != null)
+                descr = d.value();
+        }
+        return descr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected String getParameterName(MBeanOperationInfo op, MBeanParameterInfo param, int
paramNo) {
+        String name = param.getName();
+        Method m = getMethod(op);
+        if (m != null) {
+            for (Annotation a : m.getParameterAnnotations()[paramNo]) {
+                if (MBeanInfo.class.isInstance(a))
+                    name = MBeanInfo.class.cast(a).value();
+            }
+        }
+        return name;
+    }
+
+    /**
+     * Extracts the Method from the MBeanOperationInfo
+     *
+     * @param op
+     * @return
+     */
+    private Method getMethod(MBeanOperationInfo op) {
+        final MBeanParameterInfo[] params = op.getSignature();
+        final String[] paramTypes = new String[params.length];
+        for (int i = 0; i < params.length; i++)
+            paramTypes[i] = params[i].getType();
+
+        return getMethod(getMBeanInterface(), op.getName(), paramTypes);
+    }
+
+    /**
+     * Returns the Method with the specified name and parameter types for the
+     * given class, null if it doesn't exist.
+     *
+     * @param mbean
+     * @param method
+     * @param params
+     * @return
+     */
+    private static Method getMethod(Class<?> mbean, String method, String... params)
{
+        try {
+            final ClassLoader loader = mbean.getClassLoader();
+            final Class<?>[] paramClasses = new Class<?>[params.length];
+            for (int i = 0; i < params.length; i++) {
+                paramClasses[i] = primitives.get(params[i]);
+                if (paramClasses[i] == null)
+                    paramClasses[i] = Class.forName(params[i], false, loader);
+            }
+            return mbean.getMethod(method, paramClasses);
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            return null;
+        }
     }
-  }
 
     @Override
     public Object invoke(String s, Object[] objects, String[] strings) throws MBeanException,
ReflectionException {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java?rev=1348270&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
Fri Jun  8 22:56:35 2012
@@ -0,0 +1,96 @@
+package org.apache.activemq.broker.jmx;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.MBeanException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+/**
+ * MBean that invokes the requested operation using an async operation and waits for the
result
+ * if the operation times out then an exception is thrown.
+ */
+public class AsyncAnnotatedMBean extends AnnotatedMBean {
+
+    private ExecutorService executor;
+    private long timeout = 0;
+
+    public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl,
Class<T> mbeanInterface) throws NotCompliantMBeanException {
+        super(impl, mbeanInterface);
+
+        this.executor = executor;
+        this.timeout = timeout;
+    }
+
+    protected AsyncAnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException
{
+        super(mbeanInterface);
+    }
+
+    protected Object asyncInvole(String s, Object[] objects, String[] strings) throws MBeanException,
ReflectionException {
+        return super.invoke(s, objects, strings);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public static void registerMBean(ExecutorService executor, long timeout, ManagementContext
context, Object object, ObjectName objectName) throws Exception {
+
+        if (timeout < 0 && executor != null) {
+            throw new IllegalArgumentException("async timeout cannot be negative.");
+        }
+
+        if (timeout > 0 && executor == null) {
+            throw new NullPointerException("timeout given but no ExecutorService instance
given.");
+        }
+
+        String mbeanName = object.getClass().getName() + "MBean";
+
+        for (Class c : object.getClass().getInterfaces()) {
+            if (mbeanName.equals(c.getName())) {
+                if (timeout == 0) {
+                    context.registerMBean(new AnnotatedMBean(object, c), objectName);
+                } else {
+                    context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object,
c), objectName);
+                }
+                return;
+            }
+        }
+
+        context.registerMBean(object, objectName);
+    }
+
+    @Override
+    public Object invoke(String s, Object[] objects, String[] strings) throws MBeanException,
ReflectionException {
+
+        final String action = s;
+        final Object[] params = objects;
+        final String[] signature = strings;
+
+        Future<Object> task = executor.submit(new Callable<Object>() {
+
+            @Override
+            public Object call() throws Exception {
+                return asyncInvole(action, params, signature);
+            }
+        });
+
+        try {
+            return task.get(timeout, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof MBeanException) {
+                throw (MBeanException) e.getCause();
+            }
+
+            throw new MBeanException(e);
+        } catch (Exception e) {
+            throw new MBeanException(e);
+        } finally {
+            if (!task.isDone()) {
+                task.cancel(true);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/AsyncAnnotatedMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1348270&r1=1348269&r2=1348270&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
Fri Jun  8 22:56:35 2012
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import javax.management.InstanceNotFoundException;
@@ -103,11 +104,16 @@ public class ManagedRegionBroker extends
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
+    private final ExecutorService asyncInvokeService;
+    private final long mbeanTimeout;
+
     public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName
brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
                                DestinationFactory destinationFactory, DestinationInterceptor
destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException
{
         super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
         this.managementContext = context;
         this.brokerObjectName = brokerObjectName;
+        this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
+        this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
     }
 
     @Override
@@ -336,7 +342,7 @@ public class ManagedRegionBroker extends
             }
         }
         try {
-            AnnotatedMBean.registerMBean(managementContext, view, key);
+            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, key);
             registeredMBeans.add(key);
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean: " + key);
@@ -392,7 +398,7 @@ public class ManagedRegionBroker extends
         }
 
         try {
-            AnnotatedMBean.registerMBean(managementContext, view, key);
+            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, key);
             registeredMBeans.add(key);
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean: " + key);
@@ -456,7 +462,7 @@ public class ManagedRegionBroker extends
         }
 
         try {
-            AnnotatedMBean.registerMBean(managementContext, view, key);
+            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, key);
             registeredMBeans.add(key);
         } catch (Throwable e) {
             LOG.warn("Failed to register MBean: " + key);
@@ -535,7 +541,7 @@ public class ManagedRegionBroker extends
             SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(),
info, subscription);
 
             try {
-                AnnotatedMBean.registerMBean(managementContext, view, objectName);
+                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, objectName);
                 registeredMBeans.add(objectName);
             } catch (Throwable e) {
                 LOG.warn("Failed to register MBean: " + key);
@@ -733,7 +739,7 @@ public class ManagedRegionBroker extends
             objectName = createObjectName(strategy);
             if (!registeredMBeans.contains(objectName))  {
                 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this,
strategy);
-                AnnotatedMBean.registerMBean(managementContext, view, objectName);
+                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, objectName);
                 registeredMBeans.add(objectName);
             }
         } catch (Exception e) {
@@ -757,7 +763,7 @@ public class ManagedRegionBroker extends
             ObjectName objectName = createObjectName(transaction);
             if (!registeredMBeans.contains(objectName))  {
                 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
-                AnnotatedMBean.registerMBean(managementContext, view, objectName);
+                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext,
view, objectName);
                 registeredMBeans.add(objectName);
             }
         } catch (Exception e) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java?rev=1348270&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java
Fri Jun  8 22:56:35 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MBeanOperationTimeoutTest {
+    private static final Logger LOG = LoggerFactory.getLogger(MBeanOperationTimeoutTest.class);
+
+    private ActiveMQConnectionFactory connectionFactory;
+    private BrokerService broker;
+    private String connectionUri;
+    private static final String destinationName = "MBeanOperationTimeoutTestQ";
+    private static final String moveToDestinationName = "MBeanOperationTimeoutTestQ.Moved";
+
+    protected MBeanServer mbeanServer;
+    protected String domain = "org.apache.activemq";
+
+    protected int messageCount = 50000;
+
+    @Test
+    public void testLongOperationTimesOut() throws Exception {
+
+        sendMessages(messageCount);
+        LOG.info("Produced " + messageCount + " messages to the broker.");
+
+        // Now get the QueueViewMBean and purge
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination="
+ destinationName + ",BrokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+
+        long count = proxy.getQueueSize();
+        assertEquals("Queue size", count, messageCount);
+
+        try {
+            LOG.info("Attempting to move one message.");
+            proxy.moveMatchingMessagesTo(null, moveToDestinationName);
+            fail("Queue purge should have timed out.");
+        } catch (TimeoutException e) {
+            LOG.info("Queue message move Timed out as expected.");
+        }
+    }
+
+    private void sendMessages(int count) throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        try {
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Destination destination = session.createQueue(destinationName);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < messageCount; i++) {
+                Message message = session.createMessage();
+                message.setIntProperty("id", i);
+                producer.send(message);
+            }
+            session.commit();
+        } finally {
+            connection.close();
+        }
+    }
+
+    protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException,
NullPointerException {
+        ObjectName objectName = new ObjectName(name);
+        if (mbeanServer.isRegistered(objectName)) {
+            LOG.info("Bean Registered: " + objectName);
+        } else {
+            fail("Could not find MBean!: " + objectName);
+        }
+        return objectName;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+        mbeanServer = broker.getManagementContext().getMBeanServer();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setMbeanInvocationTimeout(TimeUnit.SECONDS.toMillis(1));
+        answer.setUseJmx(true);
+        answer.addConnector("vm://localhost");
+        answer.setDeleteAllMessagesOnStartup(true);
+        return answer;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanOperationTimeoutTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message