ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tbeerbo...@apache.org
Subject git commit: AMBARI-7067 - ConcurrentModificationException in Resource Comparator
Date Sat, 30 Aug 2014 00:35:37 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk b6ff02347 -> b09e15932


AMBARI-7067 - ConcurrentModificationException in Resource Comparator


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b09e1593
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b09e1593
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b09e1593

Branch: refs/heads/trunk
Commit: b09e159322a403c4067ad62dbf1ca9a00e9034e2
Parents: b6ff023
Author: tbeerbower <tbeerbower@hortonworks.com>
Authored: Thu Aug 28 18:59:32 2014 -0400
Committer: tbeerbower <tbeerbower@hortonworks.com>
Committed: Fri Aug 29 20:35:15 2014 -0400

----------------------------------------------------------------------
 .../controller/jmx/JMXPropertyProvider.java     | 81 +++++++++++++++-----
 .../controller/jmx/JMXPropertyProviderTest.java | 49 +++++++++++-
 2 files changed, 108 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b09e1593/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index b5a5059..ca016f5 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -62,7 +62,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
   private static final String NAME_KEY = "name";
   private static final String PORT_KEY = "tag.port";
   private static final String DOT_REPLACEMENT_CHAR = "#";
-  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
+  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 12000L;
 
   public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
   public static final String STORM_REST_API = "STORM_REST_API";
@@ -188,13 +188,16 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
   public Set<Resource> populateResources(Set<Resource> resources, Request request,
Predicate predicate)
       throws SystemException {
 
+    // Get a valid ticket for the request.
+    Ticket ticket = new Ticket();
+
     CompletionService<Resource> completionService =
         new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
 
     // In a large cluster we could have thousands of resources to populate here.
     // Distribute the work across multiple threads.
     for (Resource resource : resources) {
-      completionService.submit(getPopulateResourceCallable(resource, request, predicate));
+      completionService.submit(getPopulateResourceCallable(resource, request, predicate,
ticket));
     }
 
     Set<Resource> keepers = new HashSet<Resource>();
@@ -205,7 +208,8 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
 
         if (resourceFuture == null) {
           // its been more than the populateTimeout since the last callable completed ...
-          // don't wait any longer
+          // invalidate the ticket to abort the threads and don't wait any longer
+          ticket.invalidate();
           LOG.error(TIMED_OUT_MSG);
           break;
         } else {
@@ -239,7 +243,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
   /**
    * Get the spec to locate the JMX stream from the given host and port
    *
-   ** @param protocol  the protocol, one of http or https
+   * @param protocol  the protocol, one of http or https
    * @param hostName  the host name
    * @param port      the port
    *
@@ -271,14 +275,15 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    * @param resource  the resource to be populated
    * @param request   the request
    * @param predicate the predicate
+   * @param ticket    a valid ticket
    *
    * @return a callable that can be used to populate the given resource
    */
   private Callable<Resource> getPopulateResourceCallable(
-      final Resource resource, final Request request, final Predicate predicate) {
+      final Resource resource, final Request request, final Predicate predicate, final Ticket
ticket) {
     return new Callable<Resource>() {
       public Resource call() throws SystemException {
-        return populateResource(resource, request, predicate);
+        return populateResource(resource, request, predicate, ticket);
       }
     };
   }
@@ -289,10 +294,11 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    * @param resource  the resource to be populated
    * @param request   the request
    * @param predicate the predicate
+   * @param ticket    a valid ticket
    *
    * @return the populated resource; null if the resource should NOT be part of the result
set for the given predicate
    */
-  private Resource populateResource(Resource resource, Request request, Predicate predicate)
+  private Resource populateResource(Resource resource, Request request, Predicate predicate,
Ticket ticket)
       throws SystemException {
 
     Set<String> ids = getRequestPropertyIds(request, predicate);
@@ -349,10 +355,14 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
         for (String hostName : hostNames) {
           try {
             in = streamProvider.readFrom(getSpec(protocol, hostName, port, componentName));
+            // if the ticket becomes invalid (timeout) then bail out
+            if (!ticket.isValid()) {
+              return resource;
+            }
             if (null == componentName || !componentName.equals(STORM_REST_API)) {
-              getHadoopMetricValue(in, ids, resource, request);
+              getHadoopMetricValue(in, ids, resource, request, ticket);
             } else {
-              getStormMetricValue(in, ids, resource);
+              getStormMetricValue(in, ids, resource, ticket);
             }
           } catch (IOException e) {
             logException(e);
@@ -373,7 +383,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    * Hadoop-specific metrics fetching
    */
   private void getHadoopMetricValue(InputStream in, Set<String> ids,
-                       Resource resource, Request request) throws IOException {
+                       Resource resource, Request request, Ticket ticket) throws IOException
{
     JMXMetricHolder metricHolder = jmxObjectReader.readValue(in);
 
     Map<String, Map<String, Object>> categories = new HashMap<String, Map<String,
Object>>();
@@ -442,11 +452,17 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
                 }
                 // We need to do the final filtering here, after the argument substitution
                 if (isRequestedPropertyId(newPropertyId, requestedPropertyId, request)) {
+                  if (!ticket.isValid()) {
+                    return;
+                  }
                   setResourceValue(resource, categories, newPropertyId, jmxCat, property,
keyList);
                 }
               }
             }
           } else {
+            if (!ticket.isValid()) {
+              return;
+            }
             setResourceValue(resource, categories, propertyId, category, property, keyList);
           }
         }
@@ -459,7 +475,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    * Storm-specific metrics fetching
    */
   private void getStormMetricValue(InputStream in, Set<String> ids,
-                                   Resource resource) throws IOException {
+                                   Resource resource, Ticket ticket) throws IOException {
     HashMap<String, Object> metricHolder = stormObjectReader.readValue(in);
     for (String category : ids) {
       Map<String, PropertyInfo> defProps = getComponentMetrics().get(STORM_REST_API);
@@ -469,6 +485,9 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
           String propName = propInfo.getPropertyId();
           Object propertyValue = metricHolder.get(propName);
           String absId = PropertyHelper.getPropertyId(category, propName);
+          if (!ticket.isValid()) {
+            return;
+          }
           // TODO: Maybe cast to int
           resource.setProperty(absId, propertyValue);
         }
@@ -509,12 +528,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
     return jmxHostProvider.getJMXProtocol(clusterName, componentName);
   }
   
-  private String getHost(Resource resource, String clusterName, String componentName) throws
SystemException {
-    return hostNamePropertyId == null ?
-        jmxHostProvider.getHostName(clusterName, componentName) :
-        (String) resource.getPropertyValue(hostNamePropertyId);
-  }
-
   private Set<String> getHosts(Resource resource, String clusterName, String componentName)
{
     return hostNamePropertyId == null ?
             jmxHostProvider.getHostNames(clusterName, componentName) :
@@ -570,6 +583,36 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
       throw (SystemException) throwable;
     }
     throw new SystemException (msg, throwable);
-  }  
-  
+  }
+
+
+  // ----- inner class : Ticket ----------------------------------------------
+
+  /**
+   * Ticket used to cancel provider threads.  The provider threads should
+   * monitor the validity of the passed in ticket and bail out if it becomes
+   * invalid (as in a timeout).
+   */
+  private static class Ticket {
+    /**
+     * Indicate whether or not the ticket is valid.
+     */
+    private volatile boolean valid = true;
+
+    /**
+     * Invalidate the ticket.
+     */
+    public void invalidate() {
+      valid = false;
+    }
+
+    /**
+     * Determine whether or not this ticket is valid.
+     *
+     * @return true if the ticket is valid
+     */
+    public boolean isValid() {
+      return valid;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/b09e1593/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
b/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
index b4c3c6a..24734f8 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/jmx/JMXPropertyProviderTest.java
@@ -339,9 +339,6 @@ public class JMXPropertyProviderTest {
     Assert.assertEquals(23634400, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/jvm",
"NonHeapMemoryUsed")));
   }
 
-
-
-
   @Test
   public void testPopulateResourcesUnhealthyResource() throws Exception {
     TestStreamProvider  streamProvider = new TestStreamProvider();
@@ -418,6 +415,52 @@ public class JMXPropertyProviderTest {
     }
   }
 
+  @Test
+  public void testPopulateResourcesTimeout() throws Exception {
+    // Set the provider to take 100 millis to return the JMX values
+    TestStreamProvider  streamProvider = new TestStreamProvider(100L);
+    TestJMXHostProvider hostProvider = new TestJMXHostProvider(true);
+    Set<Resource> resources = new HashSet<Resource>();
+
+    JMXPropertyProvider propertyProvider = new JMXPropertyProvider(
+        PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent),
+        streamProvider,
+        hostProvider,
+        "HostRoles/cluster_name",
+        "HostRoles/host_name",
+        "HostRoles/component_name",
+        "HostRoles/state",
+        Collections.singleton("STARTED"));
+
+    // set the provider timeout to 50 millis
+    propertyProvider.setPopulateTimeout(50L);
+
+    // datanode
+    Resource resource = new ResourceImpl(Resource.Type.HostComponent);
+
+    resource.setProperty(HOST_COMPONENT_HOST_NAME_PROPERTY_ID, "domu-12-31-39-14-ee-b3.compute-1.internal");
+    resource.setProperty(HOST_COMPONENT_COMPONENT_NAME_PROPERTY_ID, "DATANODE");
+
+    resources.add(resource);
+
+    // request with an empty set should get all supported properties
+    Request request = PropertyHelper.getReadRequest(Collections.<String>emptySet());
+
+    Set<Resource> resourceSet = propertyProvider.populateResources(resources, request,
null);
+
+    // make sure that the thread running the stream provider has completed
+    Thread.sleep(150L);
+
+    Assert.assertEquals(0, resourceSet.size());
+
+    // assert that properties never get set on the resource
+    Assert.assertNull(resource.getPropertyValue("metrics/rpc/ReceivedBytes"));
+    Assert.assertNull(resource.getPropertyValue("metrics/jvm/HeapMemoryMax"));
+    Assert.assertNull(resource.getPropertyValue("metrics/jvm/HeapMemoryUsed"));
+    Assert.assertNull(resource.getPropertyValue("metrics/jvm/NonHeapMemoryMax"));
+    Assert.assertNull(resource.getPropertyValue("metrics/jvm/NonHeapMemoryUsed"));
+  }
+
   public static class TestJMXHostProvider implements JMXHostProvider {
     private final boolean unknownPort;
 


Mime
View raw message