ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aonis...@apache.org
Subject [3/3] git commit: AMBARI-4881. Clean up JMXPropertyProvider hacks for STORM metrics (aonishuk)
Date Thu, 02 Oct 2014 12:03:23 GMT
AMBARI-4881. Clean up JMXPropertyProvider hacks for STORM metrics (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cc4dfe35
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cc4dfe35
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cc4dfe35

Branch: refs/heads/trunk
Commit: cc4dfe351f3ece0082cfbbe4a3657202cf93967b
Parents: ef1d698
Author: Andrew Onishuk <aonishuk@hortonworks.com>
Authored: Thu Oct 2 15:03:11 2014 +0300
Committer: Andrew Onishuk <aonishuk@hortonworks.com>
Committed: Thu Oct 2 15:03:11 2014 +0300

----------------------------------------------------------------------
 .../internal/AbstractProviderModule.java        |   24 +-
 .../controller/internal/ResourceImpl.java       |   19 +-
 .../internal/StackDefinedPropertyProvider.java  |  227 ++--
 .../server/controller/jmx/JMXHostProvider.java  |   13 -
 .../controller/jmx/JMXPropertyProvider.java     |  274 +----
 .../controller/metrics/MetricsHostProvider.java |   38 +
 .../controller/metrics/MetricsProvider.java     |  302 +++++
 .../metrics/RestMetricsPropertyProvider.java    |  448 ++++++++
 .../stacks/HDP/2.1/services/STORM/metrics.json  |   55 +-
 .../stacks/HDP/2.2/services/STORM/metrics.json  | 1079 ++++++++++++++++++
 .../StackDefinedPropertyProviderTest.java       |  357 +++---
 .../controller/jmx/JMXPropertyProviderTest.java |  513 ---------
 .../metrics/JMXPropertyProviderTest.java        |  522 +++++++++
 .../HDP/2.1.1/services/STORM/metrics.json       |  209 ++--
 14 files changed, 2925 insertions(+), 1155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
index fc60210..b5164fe 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
@@ -33,6 +33,7 @@ import org.apache.ambari.server.controller.ganglia.GangliaReportPropertyProvider
 import org.apache.ambari.server.controller.ganglia.GangliaHostProvider;
 import org.apache.ambari.server.controller.jmx.JMXHostProvider;
 import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
+import org.apache.ambari.server.controller.metrics.MetricsHostProvider;
 import org.apache.ambari.server.controller.nagios.NagiosPropertyProvider;
 import org.apache.ambari.server.controller.spi.*;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
@@ -57,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * An abstract provider module implementation.
  */
-public abstract class AbstractProviderModule implements ProviderModule, ResourceProviderObserver, JMXHostProvider, GangliaHostProvider {
+public abstract class AbstractProviderModule implements ProviderModule, ResourceProviderObserver, JMXHostProvider, GangliaHostProvider, MetricsHostProvider {
 
   private static final int PROPERTY_REQUEST_CONNECT_TIMEOUT = 5000;
   private static final int PROPERTY_REQUEST_READ_TIMEOUT    = 10000;
@@ -204,7 +205,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
   }
 
 
-  // ----- JMXHostProvider ---------------------------------------------------
+  // ----- MetricsHostProvider ---------------------------------------------------
 
   @Override
   public String getHostName(String clusterName, String componentName) throws SystemException {
@@ -225,6 +226,8 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
     return hosts;
   }
 
+  // ----- JMXHostProvider ---------------------------------------------------
+
   @Override
   public String getPort(String clusterName, String componentName) throws SystemException {
     // Parent map need not be synchronized
@@ -453,11 +456,11 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
               type,
               streamProvider,
               this,
+              this,
               PropertyHelper.getPropertyId("ServiceComponentInfo", "cluster_name"),
               null,
               PropertyHelper.getPropertyId("ServiceComponentInfo", "component_name"),
-              PropertyHelper.getPropertyId("ServiceComponentInfo", "state"),
-              Collections.singleton("STARTED"));
+              PropertyHelper.getPropertyId("ServiceComponentInfo", "state"));
 
           PropertyProvider gpp = createGangliaComponentPropertyProvider(
               type,
@@ -471,6 +474,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
               type,
               this,
               this,
+              this,
               streamProvider,
               PropertyHelper.getPropertyId("ServiceComponentInfo", "cluster_name"),
               null,
@@ -486,11 +490,11 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
               type,
               streamProvider,
               this,
+              this,
               PropertyHelper.getPropertyId("HostRoles", "cluster_name"),
               PropertyHelper.getPropertyId("HostRoles", "host_name"),
               PropertyHelper.getPropertyId("HostRoles", "component_name"),
-              PropertyHelper.getPropertyId("HostRoles", "state"),
-              Collections.singleton("STARTED"));
+              PropertyHelper.getPropertyId("HostRoles", "state"));
 
           PropertyProvider gpp = createGangliaHostComponentPropertyProvider(
               type,
@@ -505,6 +509,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
               type,
               this,
               this,
+              this,
               streamProvider,
               PropertyHelper.getPropertyId("HostRoles", "cluster_name"),
               PropertyHelper.getPropertyId("HostRoles", "host_name"),
@@ -728,14 +733,15 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource
    */
   private PropertyProvider createJMXPropertyProvider(Resource.Type type, StreamProvider streamProvider,
                                                      JMXHostProvider jmxHostProvider,
+                                                     MetricsHostProvider metricsHostProvider,
                                                      String clusterNamePropertyId,
                                                      String hostNamePropertyId,
                                                      String componentNamePropertyId,
-                                                     String statePropertyId,
-                                                     Set<String> healthyStates) {
+                                                     String statePropertyId) {
     
     return new JMXPropertyProvider(PropertyHelper.getJMXPropertyIds(type), streamProvider,
-          jmxHostProvider, clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, statePropertyId, healthyStates);
+        jmxHostProvider, metricsHostProvider, clusterNamePropertyId, hostNamePropertyId,
+                    componentNamePropertyId, statePropertyId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java
index 15fb961..6f963c9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java
@@ -21,10 +21,8 @@ package org.apache.ambari.server.controller.internal;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Simple resource implementation.
@@ -39,14 +37,15 @@ public class ResourceImpl implements Resource {
   /**
    * The map of property maps keyed by property category.
    */
-  private final Map<String, Map<String, Object>> propertiesMap = new TreeMap<String, Map<String, Object>>();
+  private final Map<String, Map<String, Object>> propertiesMap =
+      Collections.synchronizedMap(new TreeMap<String, Map<String, Object>>());
 
   // ----- Constructors ------------------------------------------------------
 
   /**
    * Create a resource of the given type.
    *
-   * @param type  the resource type
+   * @param type the resource type
    */
   public ResourceImpl(Type type) {
     this.type = type;
@@ -55,7 +54,7 @@ public class ResourceImpl implements Resource {
   /**
    * Copy constructor
    *
-   * @param resource  the resource to copy
+   * @param resource the resource to copy
    */
   public ResourceImpl(Resource resource) {
     this(resource, null);
@@ -65,8 +64,8 @@ public class ResourceImpl implements Resource {
    * Construct a resource from the given resource, setting only the properties
    * that are found in the given set of property and category ids.
    *
-   * @param resource     the resource to copy
-   * @param propertyIds  the set of requested property and category ids
+   * @param resource    the resource to copy
+   * @param propertyIds the set of requested property and category ids
    */
   public ResourceImpl(Resource resource, Set<String> propertyIds) {
     this.type = resource.getType();
@@ -106,7 +105,7 @@ public class ResourceImpl implements Resource {
 
     Map<String, Object> properties = propertiesMap.get(categoryKey);
     if (properties == null) {
-      properties = new TreeMap<String, Object>();
+      properties = Collections.synchronizedMap(new TreeMap<String, Object>());
       propertiesMap.put(categoryKey, properties);
     }
     properties.put(PropertyHelper.getPropertyName(id), value);

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
index 51c7565..32411e9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java
@@ -35,6 +35,7 @@ import org.apache.ambari.server.controller.ganglia.GangliaHostProvider;
 import org.apache.ambari.server.controller.ganglia.GangliaPropertyProvider;
 import org.apache.ambari.server.controller.jmx.JMXHostProvider;
 import org.apache.ambari.server.controller.jmx.JMXPropertyProvider;
+import org.apache.ambari.server.controller.metrics.MetricsHostProvider;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
 import org.apache.ambari.server.controller.spi.Request;
@@ -56,57 +57,71 @@ import com.google.inject.Injector;
  * This class analyzes a service's metrics to determine if additional
  * metrics should be fetched.  It's okay to maintain state here since these
  * are done per-request.
- *
  */
 public class StackDefinedPropertyProvider implements PropertyProvider {
   private static final Logger LOG = LoggerFactory.getLogger(StackDefinedPropertyProvider.class);
-  
+
   @Inject
   private static Clusters clusters = null;
   @Inject
   private static AmbariMetaInfo metaInfo = null;
-  
+  @Inject
+  private static Injector injector = null;
+
+
   private Resource.Type type = null;
   private String clusterNamePropertyId = null;
   private String hostNamePropertyId = null;
   private String componentNamePropertyId = null;
-  private String jmxStatePropertyId = null;
+  private String resourceStatePropertyId = null;
   private ComponentSSLConfiguration sslConfig = null;
   private StreamProvider streamProvider = null;
   private JMXHostProvider jmxHostProvider;
   private GangliaHostProvider gangliaHostProvider;
   private PropertyProvider defaultJmx = null;
   private PropertyProvider defaultGanglia = null;
-  
+
+  private final MetricsHostProvider metricsHostProvider;
+
+  /**
+   * PropertyHelper/AbstractPropertyProvider expect map of maps,
+   * that's why we wrap metrics into map
+   */
+  public static final String WRAPPED_METRICS_KEY = "WRAPPED_METRICS_KEY";
+
   @Inject
   public static void init(Injector injector) {
     clusters = injector.getInstance(Clusters.class);
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
+    StackDefinedPropertyProvider.injector = injector;
   }
-  
+
   public StackDefinedPropertyProvider(Resource.Type type,
       JMXHostProvider jmxHostProvider,
       GangliaHostProvider gangliaHostProvider,
+      MetricsHostProvider metricsHostProvider,
       StreamProvider streamProvider,
       String clusterPropertyId,
       String hostPropertyId,
       String componentPropertyId,
-      String jmxStatePropertyId,
+      String resourceStatePropertyId,
       PropertyProvider defaultJmxPropertyProvider,
       PropertyProvider defaultGangliaPropertyProvider
-      ) {
-    
+  ) {
+
+    this.metricsHostProvider = metricsHostProvider;
+
     if (null == clusterPropertyId)
       throw new NullPointerException("Cluster name property id cannot be null");
     if (null == componentPropertyId)
       throw new NullPointerException("Component name property id cannot be null");
-    
+
     this.type = type;
-    
+
     clusterNamePropertyId = clusterPropertyId;
     hostNamePropertyId = hostPropertyId;
     componentNamePropertyId = componentPropertyId;
-    this.jmxStatePropertyId = jmxStatePropertyId;
+    this.resourceStatePropertyId = resourceStatePropertyId;
     this.jmxHostProvider = jmxHostProvider;
     this.gangliaHostProvider = gangliaHostProvider;
     sslConfig = ComponentSSLConfiguration.instance();
@@ -114,34 +129,34 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
     defaultJmx = defaultJmxPropertyProvider;
     defaultGanglia = defaultGangliaPropertyProvider;
   }
-      
-  
+
+
   @Override
   public Set<Resource> populateResources(Set<Resource> resources,
       Request request, Predicate predicate) throws SystemException {
 
     // only arrange for one instance of Ganglia and JMX instantiation
-    Map<String, Map<String, PropertyInfo>> gangliaMap = new HashMap<String, Map<String,PropertyInfo>>();
+    Map<String, Map<String, PropertyInfo>> gangliaMap = new HashMap<String, Map<String, PropertyInfo>>();
     Map<String, Map<String, PropertyInfo>> jmxMap = new HashMap<String, Map<String, PropertyInfo>>();
 
     List<PropertyProvider> additional = new ArrayList<PropertyProvider>();
-    
+
     try {
       for (Resource r : resources) {
         String clusterName = r.getPropertyValue(clusterNamePropertyId).toString();
         String componentName = r.getPropertyValue(componentNamePropertyId).toString();
-        
+
         Cluster cluster = clusters.getCluster(clusterName);
         StackId stack = cluster.getDesiredStackVersion();
         String svc = metaInfo.getComponentToService(stack.getStackName(),
             stack.getStackVersion(), componentName);
-        
+
         List<MetricDefinition> defs = metaInfo.getMetrics(
             stack.getStackName(), stack.getStackVersion(), svc, componentName, type.name());
-        
+
         if (null == defs || 0 == defs.size())
           continue;
-        
+
         for (MetricDefinition m : defs) {
           if (m.getType().equals("ganglia")) {
             gangliaMap.put(componentName, getPropertyInfo(m));
@@ -149,12 +164,20 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
             jmxMap.put(componentName, getPropertyInfo(m));
           } else {
             PropertyProvider pp = getDelegate(m);
-            if (null != pp)
+            if(pp == null) {
+              pp = getDelegate(m,
+                  streamProvider, metricsHostProvider,
+                  clusterNamePropertyId, hostNamePropertyId,
+                  componentNamePropertyId, resourceStatePropertyId);
+            }
+            if(pp != null) {
               additional.add(pp);
+            }
+
           }
         }
       }
-        
+
       if (gangliaMap.size() > 0) {
         GangliaPropertyProvider gpp = type.equals (Resource.Type.Component) ?
           new GangliaComponentPropertyProvider(gangliaMap,
@@ -163,22 +186,23 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
           new GangliaHostComponentPropertyProvider(gangliaMap,
               streamProvider, sslConfig, gangliaHostProvider,
               clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId);
-          
+
           gpp.populateResources(resources, request, predicate);
       } else {
         defaultGanglia.populateResources(resources, request, predicate);
       }
-      
+
       if (jmxMap.size() > 0) {
         JMXPropertyProvider jpp = new JMXPropertyProvider(jmxMap, streamProvider,
-            jmxHostProvider, clusterNamePropertyId, hostNamePropertyId,
-            componentNamePropertyId, jmxStatePropertyId, Collections.singleton("STARTED"));
-        
+            jmxHostProvider, metricsHostProvider,
+            clusterNamePropertyId, hostNamePropertyId,
+            componentNamePropertyId, resourceStatePropertyId);
+
         jpp.populateResources(resources, request, predicate);
       } else {
         defaultJmx.populateResources(resources, request, predicate);
       }
-      
+
       for (PropertyProvider pp : additional) {
         pp.populateResources(resources, request, predicate);
       }
@@ -187,7 +211,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
       e.printStackTrace();
       throw new SystemException("Error loading deferred resources", e);
     }
-    
+
     return resources;
   }
 
@@ -195,59 +219,134 @@ public class StackDefinedPropertyProvider implements PropertyProvider {
   public Set<String> checkPropertyIds(Set<String> propertyIds) {
     return Collections.emptySet();
   }
-  
+
   /**
    * @param def the metric definition
-   * @return the converted Map required for JMX or Ganglia execution
+   * @return the converted Map required for JMX or Ganglia execution.
+   * Format: <metric name, property info>
    */
-  private  Map<String, PropertyInfo> getPropertyInfo(MetricDefinition def) {
+  private Map<String, PropertyInfo> getPropertyInfo(MetricDefinition def) {
     Map<String, PropertyInfo> defs = new HashMap<String, PropertyInfo>();
-    
-    for (Entry<String,Metric> entry : def.getMetrics().entrySet()) {
+
+    for (Entry<String, Metric> entry : def.getMetrics().entrySet()) {
       Metric metric = entry.getValue();
       defs.put(entry.getKey(), new PropertyInfo(
           metric.getName(), metric.isTemporal(), metric.isPointInTime()));
     }
-    
+
     return defs;
   }
-  
+
   /**
-   * @param the metric definition for a component and resource type combination
+   * @param definition metric definition for a component and resource type combination
    * @return the custom property provider
    */
   private PropertyProvider getDelegate(MetricDefinition definition) {
+    try {
+      Class<?> clz = Class.forName(definition.getType());
+
+      // singleton/factory
       try {
-        Class<?> clz = Class.forName(definition.getType());
-
-        // singleton/factory
-        try {
-          Method m = clz.getMethod("getInstance", Map.class, Map.class);
-          Object o = m.invoke(null, definition.getProperties(), definition.getMetrics());
-          return PropertyProvider.class.cast(o);
-        } catch (Exception e) {
-          LOG.info("Could not load singleton or factory method for type '" +
-              definition.getType());
-        }
-        
-        // try maps constructor        
-        try {
-          Constructor<?> ct = clz.getConstructor(Map.class, Map.class);
-          Object o = ct.newInstance(definition.getProperties(), definition.getMetrics());
-          return PropertyProvider.class.cast(o);
-        } catch (Exception e) {
-          LOG.info("Could not find contructor for type '" +
-              definition.getType());
-        }
-        
-        // just new instance
-        return PropertyProvider.class.cast(clz.newInstance());
+        Method m = clz.getMethod("getInstance", Map.class, Map.class);
+        Object o = m.invoke(null, definition.getProperties(), definition.getMetrics());
+        return PropertyProvider.class.cast(o);
+      } catch (Exception e) {
+        LOG.info("Could not load singleton or factory method for type '" +
+            definition.getType());
+      }
+
+      // try maps constructor
+      try {
+        Constructor<?> ct = clz.getConstructor(Map.class, Map.class);
+        Object o = ct.newInstance(definition.getProperties(), definition.getMetrics());
+        return PropertyProvider.class.cast(o);
+      } catch (Exception e) {
+        LOG.info("Could not find contructor for type '" +
+            definition.getType());
+      }
+
+      // just new instance
+      return PropertyProvider.class.cast(clz.newInstance());
+
+    } catch (Exception e) {
+      LOG.error("Could not load class " + definition.getType());
+      return null;
+    }
+  }
+
+  /**
+   *
+   * @param definition the metric definition for a component
+   * @param streamProvider the stream provider
+   * @param metricsHostProvider the metrics host provider
+   * @param clusterNamePropertyId the cluster name property id
+   * @param hostNamePropertyId the host name property id
+   * @param componentNamePropertyId the component name property id
+   * @param statePropertyId the state property id
+   * @return the custom property provider
+   */
+
+  private PropertyProvider getDelegate(MetricDefinition definition,
+                                       StreamProvider streamProvider,
+                                       MetricsHostProvider metricsHostProvider,
+                                       String clusterNamePropertyId,
+                                       String hostNamePropertyId,
+                                       String componentNamePropertyId,
+                                       String statePropertyId) {
+    Map<String, PropertyInfo> metrics = getPropertyInfo(definition);
+    HashMap<String, Map<String, PropertyInfo>> componentMetrics =
+        new HashMap<String, Map<String, PropertyInfo>>();
+    componentMetrics.put(WRAPPED_METRICS_KEY, metrics);
+
+    try {
+      Class<?> clz = Class.forName(definition.getType());
+      // singleton/factory
+      try {
+                /*
+         * Interface for singleton/factory method invocation TBD
+         * when implementing the first real use
+         */
+        Method m = clz.getMethod("getInstance", Map.class, Map.class);
+        Object o = m.invoke(
+            definition.getProperties(), componentMetrics,
+            streamProvider, clusterNamePropertyId, hostNamePropertyId,
+            componentNamePropertyId, statePropertyId);
+        return PropertyProvider.class.cast(o);
+      } catch (Exception e) {
+        LOG.info("Could not load singleton or factory method for type '" +
+            definition.getType());
+      }
 
+      // try maps constructor
+      try {
+                /*
+         * Warning: this branch is already used, that's why please adjust
+         * all implementations when modifying constructor interface
+         */
+        Constructor<?> ct = clz.getConstructor(Injector.class, Map.class,
+            Map.class, StreamProvider.class, MetricsHostProvider.class,
+            String.class, String.class, String.class, String.class);
+        Object o = ct.newInstance(
+            injector,
+            definition.getProperties(), componentMetrics,
+            streamProvider, metricsHostProvider,
+            clusterNamePropertyId, hostNamePropertyId,
+            componentNamePropertyId, statePropertyId);
+        return PropertyProvider.class.cast(o);
       } catch (Exception e) {
-        LOG.error("Could not load class " + definition.getType());
-        return null;
+        LOG.info("Could not find contructor for type '" +
+            definition.getType());
       }
+
+      // just new instance
+      return PropertyProvider.class.cast(clz.newInstance());
+
+    } catch (Exception e) {
+      LOG.error("Could not load class " + definition.getType());
+      return null;
+    }
+
+
   }
-  
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
index 12f3725..65f7be7 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java
@@ -27,19 +27,6 @@ import java.util.Set;
 public interface JMXHostProvider {
 
   /**
-   * Get the JMX host name for the given cluster name and component name.
-   *
-   * @param clusterName    the cluster name
-   * @param componentName  the component name
-   *
-   * @return the JMX host name
-   *
-   * @throws SystemException if unable to get the JMX host name
-   */
-  public String getHostName(String clusterName, String componentName)
-      throws SystemException;
-
-  /**
    * Get the JMX host names for the given cluster name and component name.
    *
    * @param clusterName    the cluster name

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
index ca016f5..975a479 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java
@@ -27,20 +27,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.metrics.MetricsHostProvider;
+import org.apache.ambari.server.controller.metrics.MetricsProvider;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
@@ -57,39 +49,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Property provider implementation for JMX sources.
  */
-public class JMXPropertyProvider extends AbstractPropertyProvider {
+public class JMXPropertyProvider extends MetricsProvider {
 
   private static final String NAME_KEY = "name";
   private static final String PORT_KEY = "tag.port";
   private static final String DOT_REPLACEMENT_CHAR = "#";
-  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 12000L;
-
-  public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
-  public static final String STORM_REST_API = "STORM_REST_API";
-
-  /**
-   * Thread pool
-   */
-  private static final ExecutorService EXECUTOR_SERVICE;
-  private static final int THREAD_POOL_CORE_SIZE = 20;
-  private static final int THREAD_POOL_MAX_SIZE = 100;
-  private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
-
-  static {
-    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue
-
-    ThreadPoolExecutor threadPoolExecutor =
-        new ThreadPoolExecutor(
-            THREAD_POOL_CORE_SIZE,
-            THREAD_POOL_MAX_SIZE,
-            THREAD_POOL_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS,
-            queue);
-
-    threadPoolExecutor.allowCoreThreadTimeOut(true);
-
-    EXECUTOR_SERVICE = threadPoolExecutor;
-  }
 
   private final static ObjectReader jmxObjectReader;
   private final static ObjectReader stormObjectReader;
@@ -138,16 +102,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
 
   private final String statePropertyId;
 
-  private final Set<String> healthyStates;
-
-  /**
-   * The amount of time that this provider will wait for JMX metric values to be
-   * returned from the JMX sources.  If no results are returned for this amount of
-   * time then the request to populate the resources will fail.
-   */
-  protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
-
-
   // ----- Constructors ------------------------------------------------------
 
   /**
@@ -155,23 +109,23 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    *
    * @param componentMetrics         the map of supported metrics
    * @param streamProvider           the stream provider
-   * @param jmxHostProvider          the host mapping
+   * @param jmxHostProvider          the JMX host mapping
+   * @param metricsHostProvider      the host mapping
    * @param clusterNamePropertyId    the cluster name property id
    * @param hostNamePropertyId       the host name property id
    * @param componentNamePropertyId  the component name property id
    * @param statePropertyId          the state property id
-   * @param healthyStates            the set of healthy state values
    */
   public JMXPropertyProvider(Map<String, Map<String, PropertyInfo>> componentMetrics,
                              StreamProvider streamProvider,
                              JMXHostProvider jmxHostProvider,
+                             MetricsHostProvider metricsHostProvider,
                              String clusterNamePropertyId,
                              String hostNamePropertyId,
                              String componentNamePropertyId,
-                             String statePropertyId,
-                             Set<String> healthyStates) {
+                             String statePropertyId) {
 
-    super(componentMetrics);
+    super(componentMetrics, hostNamePropertyId, metricsHostProvider);
 
     this.streamProvider           = streamProvider;
     this.jmxHostProvider          = jmxHostProvider;
@@ -179,116 +133,11 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
     this.hostNamePropertyId       = hostNamePropertyId;
     this.componentNamePropertyId  = componentNamePropertyId;
     this.statePropertyId          = statePropertyId;
-    this.healthyStates            = healthyStates;
   }
-  
-  // ----- PropertyProvider --------------------------------------------------
-
-  @Override
-  public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
-      throws SystemException {
-
-    // Get a valid ticket for the request.
-    Ticket ticket = new Ticket();
-
-    CompletionService<Resource> completionService =
-        new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
-
-    // In a large cluster we could have thousands of resources to populate here.
-    // Distribute the work across multiple threads.
-    for (Resource resource : resources) {
-      completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket));
-    }
-
-    Set<Resource> keepers = new HashSet<Resource>();
-    try {
-      for (int i = 0; i < resources.size(); ++ i) {
-        Future<Resource> resourceFuture =
-            completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
-
-        if (resourceFuture == null) {
-          // its been more than the populateTimeout since the last callable completed ...
-          // invalidate the ticket to abort the threads and don't wait any longer
-          ticket.invalidate();
-          LOG.error(TIMED_OUT_MSG);
-          break;
-        } else {
-          // future should already be completed... no need to wait on get
-          Resource resource = resourceFuture.get();
-          if (resource != null) {
-            keepers.add(resource);
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      logException(e);
-    } catch (ExecutionException e) {
-      rethrowSystemException(e.getCause());
-    }
-    return keepers;
-  }
-
 
   // ----- helper methods ----------------------------------------------------
 
   /**
-   * Set the populate timeout value for this provider.
-   *
-   * @param populateTimeout  the populate timeout value
-   */
-  protected void setPopulateTimeout(long populateTimeout) {
-    this.populateTimeout = populateTimeout;
-  }
-
-  /**
-   * Get the spec to locate the JMX stream from the given host and port
-   *
-   * @param protocol  the protocol, one of http or https
-   * @param hostName  the host name
-   * @param port      the port
-   *
-   * @return the spec
-   */
-  protected String getSpec(String protocol, String hostName,
-                           String port, String componentName) {
-      if (null == componentName || !componentName.equals(STORM_REST_API))
-        return protocol + "://" + hostName + ":" + port + "/jmx";
-      else
-        return protocol + "://" + hostName + ":" + port + "/api/cluster/summary";
-  }
-
-  /**
-   * Get the spec to locate the JMX stream from the given host and port
-   *
-   * @param hostName  the host name
-   * @param port      the port
-   *
-   * @return the spec
-   */
-  protected String getSpec(String hostName, String port) {
-      return getSpec("http", hostName, port, null);
-  }
-  
-  /**
-   * Get a callable that can be used to populate the given resource.
-   *
-   * @param resource  the resource to be populated
-   * @param request   the request
-   * @param predicate the predicate
-   * @param ticket    a valid ticket
-   *
-   * @return a callable that can be used to populate the given resource
-   */
-  private Callable<Resource> getPopulateResourceCallable(
-      final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) {
-    return new Callable<Resource>() {
-      public Resource call() throws SystemException {
-        return populateResource(resource, request, predicate, ticket);
-      }
-    };
-  }
-
-  /**
    * Populate a resource by obtaining the requested JMX properties.
    *
    * @param resource  the resource to be populated
@@ -298,7 +147,8 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
    *
    * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
    */
-  private Resource populateResource(Resource resource, Request request, Predicate predicate, Ticket ticket)
+  @Override
+  protected Resource populateResource(Resource resource, Request request, Predicate predicate, MetricsProvider.Ticket ticket)
       throws SystemException {
 
     Set<String> ids = getRequestPropertyIds(request, predicate);
@@ -354,16 +204,14 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
       try {
         for (String hostName : hostNames) {
           try {
-            in = streamProvider.readFrom(getSpec(protocol, hostName, port, componentName));
+            in = streamProvider.readFrom(getSpec(protocol, hostName, port, "/jmx"));
             // if the ticket becomes invalid (timeout) then bail out
             if (!ticket.isValid()) {
               return resource;
             }
-            if (null == componentName || !componentName.equals(STORM_REST_API)) {
-              getHadoopMetricValue(in, ids, resource, request, ticket);
-            } else {
-              getStormMetricValue(in, ids, resource, ticket);
-            }
+
+            getHadoopMetricValue(in, ids, resource, request, ticket);
+
           } catch (IOException e) {
             logException(e);
           }
@@ -470,31 +318,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
     }
   }
 
-  /**
-   * TODO: Refactor
-   * Storm-specific metrics fetching
-   */
-  private void getStormMetricValue(InputStream in, Set<String> ids,
-                                   Resource resource, Ticket ticket) throws IOException {
-    HashMap<String, Object> metricHolder = stormObjectReader.readValue(in);
-    for (String category : ids) {
-      Map<String, PropertyInfo> defProps = getComponentMetrics().get(STORM_REST_API);
-      for (Map.Entry<String, PropertyInfo> depEntry : defProps.entrySet()) {
-        if (depEntry.getKey().startsWith(category)) {
-          PropertyInfo propInfo = depEntry.getValue();
-          String propName = propInfo.getPropertyId();
-          Object propertyValue = metricHolder.get(propName);
-          String absId = PropertyHelper.getPropertyId(category, propName);
-          if (!ticket.isValid()) {
-            return;
-          }
-          // TODO: Maybe cast to int
-          resource.setProperty(absId, propertyValue);
-        }
-      }
-    }
-  }
-
   private void setResourceValue(Resource resource, Map<String, Map<String, Object>> categories, String propertyId,
                                 String category, String property, List<String> keyList) {
     Map<String, Object> properties = categories.get(category);
@@ -546,73 +369,4 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
     }
     return null;
   }
-
-  /**
-   * Determine whether or not the given property id was requested.
-   */
-  private static boolean isRequestedPropertyId(String propertyId, String requestedPropertyId, Request request) {
-    return request.getPropertyIds().isEmpty() || propertyId.startsWith(requestedPropertyId);
-  }
-
-  /**
-   * Log an error for the given exception.
-   *
-   * @param throwable  the caught exception
-   *
-   * @return the error message that was logged
-   */
-  private static String logException(Throwable throwable) {
-    String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
-
-    LOG.debug(msg, throwable);
-
-    return msg;
-  }
-
-  /**
-   * Rethrow the given exception as a System exception and log the message.
-   *
-   * @param throwable  the caught exception
-   *
-   * @throws org.apache.ambari.server.controller.spi.SystemException always around the given exception
-   */
-  private static void rethrowSystemException(Throwable throwable) throws SystemException {
-    String msg = logException(throwable);
-
-    if (throwable instanceof SystemException) {
-      throw (SystemException) throwable;
-    }
-    throw new SystemException (msg, throwable);
-  }
-
-
-  // ----- inner class : Ticket ----------------------------------------------
-
-  /**
-   * Ticket used to cancel provider threads.  The provider threads should
-   * monitor the validity of the passed in ticket and bail out if it becomes
-   * invalid (as in a timeout).
-   */
-  private static class Ticket {
-    /**
-     * Indicate whether or not the ticket is valid.
-     */
-    private volatile boolean valid = true;
-
-    /**
-     * Invalidate the ticket.
-     */
-    public void invalidate() {
-      valid = false;
-    }
-
-    /**
-     * Determine whether or not this ticket is valid.
-     *
-     * @return true if the ticket is valid
-     */
-    public boolean isValid() {
-      return valid;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java
new file mode 100644
index 0000000..1a96829
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics;
+
+
+import org.apache.ambari.server.controller.spi.SystemException;
+
+public interface MetricsHostProvider {
+
+  /**
+   * Get the host name for the given cluster name and component name.
+   *
+   * @param clusterName   the cluster name
+   * @param componentName the component name
+   * @return the host name
+   * @throws org.apache.ambari.server.controller.spi.SystemException
+   *          if unable to get the JMX host name
+   */
+  public String getHostName(String clusterName, String componentName)
+      throws SystemException;
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java
new file mode 100644
index 0000000..27c55f4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.metrics;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+
+/**
+ * Unites common functionality for multithreaded metrics providers
+ * (JMX and REST as of now). Shares the same pool of executor threads.
+ */
+public abstract class MetricsProvider extends AbstractPropertyProvider {
+
+  /**
+   * Host states that make available metrics collection
+   */
+  public static final Set<String> healthyStates = Collections.singleton("STARTED");
+  protected final String hostNamePropertyId;
+  private final MetricsHostProvider metricsHostProvider;
+
+  /**
+   * Executor service is shared between all childs of current class
+   */
+  private static final ExecutorService EXECUTOR_SERVICE = initExecutorService();
+  private static final int THREAD_POOL_CORE_SIZE = 20;
+  private static final int THREAD_POOL_MAX_SIZE = 100;
+  private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
+
+  private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
+  /**
+   * The amount of time that this provider will wait for JMX metric values to be
+   * returned from the JMX sources.  If no results are returned for this amount of
+   * time then the request to populate the resources will fail.
+   */
+  protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
+  public static final String TIMED_OUT_MSG = "Timed out waiting for metrics.";
+
+  // ----- Constructors ------------------------------------------------------
+
+  /**
+   * Construct a provider.
+   *
+   * @param componentMetrics map of metrics for this provider
+   */
+  public MetricsProvider(Map<String, Map<String, PropertyInfo>> componentMetrics,
+                         String hostNamePropertyId,
+                         MetricsHostProvider metricsHostProvider) {
+    super(componentMetrics);
+    this.hostNamePropertyId = hostNamePropertyId;
+    this.metricsHostProvider = metricsHostProvider;
+  }
+
+  // ----- Thread pool -------------------------------------------------------
+
+  /**
+   * Generates thread pool with default parameters
+   */
+
+
+  private static ExecutorService initExecutorService() {
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue
+
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            THREAD_POOL_CORE_SIZE,
+            THREAD_POOL_MAX_SIZE,
+            THREAD_POOL_TIMEOUT_MILLIS,
+            TimeUnit.MILLISECONDS,
+            queue);
+
+    threadPoolExecutor.allowCoreThreadTimeOut(true);
+
+    return threadPoolExecutor;
+  }
+
+  public static ExecutorService getExecutorService() {
+    return EXECUTOR_SERVICE;
+  }
+
+  // ----- Common PropertyProvider implementation details --------------------
+
+  @Override
+  public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
+      throws SystemException {
+
+    // Get a valid ticket for the request.
+    Ticket ticket = new Ticket();
+
+    CompletionService<Resource> completionService =
+        new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
+
+    // In a large cluster we could have thousands of resources to populate here.
+    // Distribute the work across multiple threads.
+    for (Resource resource : resources) {
+      completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket));
+    }
+
+    Set<Resource> keepers = new HashSet<Resource>();
+    try {
+      for (int i = 0; i < resources.size(); ++ i) {
+        Future<Resource> resourceFuture =
+            completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
+
+        if (resourceFuture == null) {
+          // its been more than the populateTimeout since the last callable completed ...
+          // invalidate the ticket to abort the threads and don't wait any longer
+          ticket.invalidate();
+          LOG.error(TIMED_OUT_MSG);
+          break;
+        } else {
+          // future should already be completed... no need to wait on get
+          Resource resource = resourceFuture.get();
+          if (resource != null) {
+            keepers.add(resource);
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      logException(e);
+    } catch (ExecutionException e) {
+      rethrowSystemException(e.getCause());
+    }
+    return keepers;
+  }
+
+  /**
+   * Get a callable that can be used to populate the given resource.
+   *
+   * @param resource  the resource to be populated
+   * @param request   the request
+   * @param predicate the predicate
+   * @param ticket    a valid ticket
+   *
+   * @return a callable that can be used to populate the given resource
+   */
+  private Callable<Resource> getPopulateResourceCallable(
+      final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) {
+    return new Callable<Resource>() {
+      public Resource call() throws SystemException {
+        return populateResource(resource, request, predicate, ticket);
+      }
+    };
+  }
+
+
+  /**
+   * Populate a resource by obtaining the requested JMX properties.
+   *
+   * @param resource  the resource to be populated
+   * @param request   the request
+   * @param predicate the predicate
+   * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
+   */
+
+
+  protected abstract Resource populateResource(Resource resource,
+                                               Request request, Predicate predicate, Ticket ticket)
+
+      throws SystemException;
+
+  /**
+   * Set the populate timeout value for this provider.
+   *
+   * @param populateTimeout the populate timeout value
+   */
+
+
+  protected void setPopulateTimeout(long populateTimeout) {
+    this.populateTimeout = populateTimeout;
+
+  }
+
+
+  // ----- helper methods ----------------------------------------------------
+
+  /**
+   * Determine whether or not the given property id was requested.
+   */
+  protected static boolean isRequestedPropertyId(String propertyId, String requestedPropertyId, Request request) {
+    return request.getPropertyIds().isEmpty() || propertyId.startsWith(requestedPropertyId);
+  }
+
+  /**
+   * Log an error for the given exception.
+   *
+   * @param throwable  the caught exception
+   *
+   * @return the error message that was logged
+   */
+  protected static String logException(Throwable throwable) {
+    String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
+
+    LOG.debug(msg, throwable);
+
+    return msg;
+  }
+
+  /**
+   * Rethrow the given exception as a System exception and log the message.
+   *
+   * @param throwable  the caught exception
+   *
+   * @throws org.apache.ambari.server.controller.spi.SystemException always around the given exception
+   */
+  protected static void rethrowSystemException(Throwable throwable) throws SystemException {
+    String msg = logException(throwable);
+
+    if (throwable instanceof SystemException) {
+      throw (SystemException) throwable;
+    }
+    throw new SystemException (msg, throwable);
+  }
+
+  /**
+   * Returns a hostname for component
+   */
+
+
+  public String getHost(Resource resource, String clusterName, String componentName) throws SystemException {
+    return hostNamePropertyId == null ?
+        metricsHostProvider.getHostName(clusterName, componentName) :
+        (String) resource.getPropertyValue(hostNamePropertyId);
+
+  }
+
+
+  /**
+   * Get complete URL from parts
+   */
+
+  protected String getSpec(String protocol, String hostName,
+                           String port, String url) {
+    return protocol + "://" + hostName + ":" + port + url;
+
+  }
+
+  // ----- inner class : Ticket ----------------------------------------------
+
+  /**
+   * Ticket used to cancel provider threads.  The provider threads should
+   * monitor the validity of the passed in ticket and bail out if it becomes
+   * invalid (as in a timeout).
+   */
+  protected static class Ticket {
+    /**
+     * Indicate whether or not the ticket is valid.
+     */
+    private volatile boolean valid = true;
+
+    /**
+     * Invalidate the ticket.
+     */
+    public void invalidate() {
+      valid = false;
+    }
+
+    /**
+     * Determine whether or not this ticket is valid.
+     *
+     * @return true if the ticket is valid
+     */
+    public boolean isValid() {
+      return valid;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
new file mode 100644
index 0000000..48d06b8
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.ambari.server.controller.metrics;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.internal.PropertyInfo;
+import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider;
+import org.apache.ambari.server.controller.spi.Predicate;
+import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.Resource;
+import org.apache.ambari.server.controller.spi.SystemException;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * WARNING: Class should be thread-safe!
+ * <p/>
+ * Resolves metrics like api/cluster/summary/nimbus.uptime
+ * For every metric, finds a relevant JSON value and returns is as
+ * a resource property.
+ */
+public class RestMetricsPropertyProvider extends MetricsProvider {
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(RestMetricsPropertyProvider.class);
+
+  private static Map<String, RestMetricsPropertyProvider> instances =
+      new Hashtable<String, RestMetricsPropertyProvider>();
+
+  @Inject
+  private AmbariManagementController amc;
+
+  @Inject
+  private Clusters clusters;
+
+  private final Map<String, String> metricsProperties;
+  private final StreamProvider streamProvider;
+  private final String clusterNamePropertyId;
+  private final String componentNamePropertyId;
+  private final String statePropertyId;
+  private MetricsHostProvider metricsHostProvider;
+
+  private static final String DEFAULT_PORT_PROPERTY = "default_port";
+  private static final String PORT_CONFIG_TYPE_PROPERTY = "port_config_type";
+  private static final String PORT_PROPERTY_NAME_PROPERTY = "port_property_name";
+
+  /**
+   * Protocol to use when connecting
+   */
+  private static final String PROTOCOL_OVERRIDE_PROPERTY = "protocol";
+  private static final String HTTP_PROTOCOL = "http";
+  private static final String HTTPS_PROTOCOL = "https";
+  private static final String DEFAULT_PROTOCOL = HTTP_PROTOCOL;
+
+
+  /**
+   * String that separates JSON URL from path inside JSON in metrics path
+   */
+  public static final String URL_PATH_SEPARATOR = "##";
+
+  /**
+   * Symbol that separates names of nested JSON sections in metrics path
+   */
+  public static final String DOCUMENT_PATH_SEPARATOR = "#";
+
+
+  /**
+   * Create a REST property provider.
+   *
+   * @param metricsProperties       the map of per-component metrics properties
+   * @param componentMetrics        the map of supported metrics for component
+   * @param streamProvider          the stream provider
+   * @param metricsHostProvider     metricsHostProvider instance
+   * @param clusterNamePropertyId   the cluster name property id
+   * @param hostNamePropertyId      the host name property id
+   * @param componentNamePropertyId the component name property id
+   * @param statePropertyId         the state property id
+   */
+  public RestMetricsPropertyProvider(
+      Injector injector,
+      Map<String, String> metricsProperties,
+      Map<String, Map<String, PropertyInfo>> componentMetrics,
+      StreamProvider streamProvider,
+      MetricsHostProvider metricsHostProvider,
+      String clusterNamePropertyId,
+      String hostNamePropertyId,
+      String componentNamePropertyId,
+      String statePropertyId) {
+
+    super(componentMetrics, hostNamePropertyId, metricsHostProvider);
+    this.metricsProperties = metricsProperties;
+    this.streamProvider = streamProvider;
+    this.clusterNamePropertyId = clusterNamePropertyId;
+    this.componentNamePropertyId = componentNamePropertyId;
+    this.statePropertyId = statePropertyId;
+    this.metricsHostProvider = metricsHostProvider;
+    injector.injectMembers(this);
+  }
+
+  // ----- MetricsProvider implementation ------------------------------------
+
+
+  /**
+   * Populate a resource by obtaining the requested REST properties.
+   *
+   * @param resource  the resource to be populated
+   * @param request   the request
+   * @param predicate the predicate
+   * @return the populated resource; null if the resource should NOT be
+   *         part of the result set for the given predicate
+   */
+  @Override
+  protected Resource populateResource(Resource resource,
+                                      Request request, Predicate predicate, Ticket ticket)
+      throws SystemException {
+
+    // Remove request properties that request temporal information
+    Set<String> ids = getRequestPropertyIds(request, predicate);
+    Set<String> temporalIds = new HashSet<String>();
+    for (String id : ids) {
+      if (request.getTemporalInfo(id) != null) {
+        temporalIds.add(id);
+      }
+    }
+    ids.removeAll(temporalIds);
+
+    if (ids.isEmpty()) {
+      // no properties requested
+      return resource;
+    }
+
+    // Don't attempt to get REST properties if the resource is in
+    // an unhealthy state
+    if (statePropertyId != null) {
+      String state = (String) resource.getPropertyValue(statePropertyId);
+      if (state != null && !healthyStates.contains(state)) {
+        return resource;
+      }
+    }
+
+    String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
+
+    Map<String, PropertyInfo> propertyInfos =
+        getComponentMetrics().get(StackDefinedPropertyProvider.WRAPPED_METRICS_KEY);
+    if (propertyInfos == null) {
+      // If there are no metrics defined for the given component then there is nothing to do.
+      return resource;
+    }
+    String protocol = resolveProtocol();
+    String port = "-1";
+    String hostname = null;
+    try {
+      String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
+      Cluster cluster = clusters.getCluster(clusterName);
+      hostname = getHost(resource, clusterName, componentName);
+      if (hostname == null) {
+        String msg = String.format("Unable to get component REST metrics. " +
+            "No host name for %s.", componentName);
+        LOG.warn(msg);
+        return resource;
+      }
+      port = resolvePort(cluster, hostname, componentName, metricsProperties);
+    } catch (Exception e) {
+      rethrowSystemException(e);
+    }
+
+    Set<String> resultIds = new HashSet<String>();
+    for (String id : ids){
+      for (String metricId : propertyInfos.keySet()){
+        if (metricId.startsWith(id)){
+          resultIds.add(metricId);
+        }
+      }
+    }
+
+    // Extract set of URLs for metrics
+    HashMap<String, Set<String>> urls = extractPropertyURLs(resultIds, propertyInfos);
+
+    for (String url : urls.keySet()) {
+      try {
+        InputStream in = streamProvider.readFrom(getSpec(protocol, hostname, port, url));
+        if (!ticket.isValid()) {
+          return resource;
+        }
+        try {
+          extractValuesFromJSON(in, urls.get(url), resource, propertyInfos);
+        } finally {
+          in.close();
+        }
+      } catch (IOException e) {
+        logException(e);
+      }
+    }
+    return resource;
+  }
+
+  @Override
+  public Set<String> checkPropertyIds(Set<String> propertyIds) {
+    Set<String> unsupported = new HashSet<String>();
+    for (String propertyId : propertyIds) {
+      if (!getComponentMetrics().
+          get(StackDefinedPropertyProvider.WRAPPED_METRICS_KEY).
+          containsKey(propertyId)) {
+        unsupported.add(propertyId);
+      }
+    }
+    return unsupported;
+  }
+
+  // ----- helper methods ----------------------------------------------------
+
+  /**
+   * Uses port_config_type, port_property_name, default_port parameters from
+   * metricsProperties to find out right port value for service
+   *
+   * @return determines REST port for service
+   */
+  private String resolvePort(Cluster cluster, String hostname, String componentName,
+                          Map<String, String> metricsProperties)
+      throws AmbariException {
+    String portConfigType = null;
+    String portPropertyName = null;
+    if (metricsProperties.containsKey(PORT_CONFIG_TYPE_PROPERTY) &&
+        metricsProperties.containsKey(PORT_PROPERTY_NAME_PROPERTY)) {
+      portConfigType = metricsProperties.get(PORT_CONFIG_TYPE_PROPERTY);
+      portPropertyName = metricsProperties.get(PORT_PROPERTY_NAME_PROPERTY);
+    }
+    String portStr = null;
+    if (portConfigType != null && portPropertyName != null) {
+      try {
+        Map<String, Map<String, String>> configTags =
+            amc.findConfigurationTagsWithOverrides(cluster, hostname);
+        if (configTags.containsKey(portConfigType)) {
+          Map<String, String> config = configTags.get(portConfigType);
+          if (config.containsKey(portPropertyName)) {
+            portStr = config.get(portPropertyName);
+          }
+        }
+      } catch (AmbariException e) {
+        String message = String.format("Can not extract config tags for " +
+            "cluster = %s, hostname = %s", componentName, hostname);
+        LOG.warn(message);
+      }
+      if (portStr == null) {
+        String message = String.format(
+            "Can not extract REST port for " +
+                "component %s from configurations. " +
+                "Config tag = %s, config key name = %s, " +
+                "hostname = %s. Probably metrics.json file for " +
+                "service is misspelled. Trying default port",
+            componentName, portConfigType,
+            portPropertyName, hostname);
+        LOG.warn(message);
+      }
+    }
+    if (portStr == null && metricsProperties.containsKey(DEFAULT_PORT_PROPERTY)) {
+      if (metricsProperties.containsKey(DEFAULT_PORT_PROPERTY)) {
+        portStr = metricsProperties.get(DEFAULT_PORT_PROPERTY);
+      } else {
+        String message = String.format("Can not determine REST port for " +
+            "component %s. " +
+            "Default REST port property %s is not defined at metrics.json " +
+            "file for service, and there is no any other available ways " +
+            "to determine port information.",
+            componentName, DEFAULT_PORT_PROPERTY);
+        throw new AmbariException(message);
+      }
+    }
+      return portStr;
+  }
+
+
+  /**
+   * Extracts protocol type from metrics properties. If no protocol is defined,
+   * uses default protocol.
+   */
+  private String resolveProtocol() {
+    String protocol = DEFAULT_PROTOCOL;
+    if (metricsProperties.containsKey(PROTOCOL_OVERRIDE_PROPERTY)) {
+      protocol = metricsProperties.get(PROTOCOL_OVERRIDE_PROPERTY).toLowerCase();
+      if (!protocol.equals(HTTP_PROTOCOL) && !protocol.equals(HTTPS_PROTOCOL)) {
+        String message = String.format(
+            "Unsupported protocol type %s, falling back to %s",
+            protocol, DEFAULT_PROTOCOL);
+        LOG.warn(message);
+        protocol = DEFAULT_PROTOCOL;
+      }
+    } else {
+      protocol = DEFAULT_PROTOCOL;
+    }
+    return protocol;
+  }
+
+
+  /**
+   * Extracts JSON URL from metricsPath
+   */
+  private String extractMetricsURL(String metricsPath)
+      throws IllegalArgumentException {
+    return validateAndExtractPathParts(metricsPath)[0];
+  }
+
+  /**
+   * Extracts part of metrics path that contains path through nested
+   * JSON sections
+   */
+  private String extractDocumentPath(String metricsPath)
+      throws IllegalArgumentException {
+    return validateAndExtractPathParts(metricsPath)[1];
+  }
+
+  /**
+   * Returns [MetricsURL, DocumentPath] or throws an exception
+   * if metricsPath is invalid.
+   */
+  private String[] validateAndExtractPathParts(String metricsPath)
+      throws IllegalArgumentException {
+    String[] pathParts = metricsPath.split(URL_PATH_SEPARATOR);
+    if (pathParts.length == 2) {
+      return pathParts;
+    } else {
+      // This warning is expected to occur only on development phase
+      String message = String.format(
+          "Metrics path %s does not contain or contains" +
+              "more than one %s sequence. That probably " +
+              "means that the mentioned metrics path is misspelled. " +
+              "Please check the relevant metrics.json file",
+          metricsPath, URL_PATH_SEPARATOR);
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+
+  /**
+   * Returns a map <document_url, requested_property_ids>.
+   * requested_property_ids contain a set of property IDs
+   * that should be fetched for this URL. Doing
+   * that allows us to extract document only once when getting few properties
+   * from this document.
+   *
+   * @param ids set of property IDs that should be fetched
+   */
+  private HashMap<String, Set<String>> extractPropertyURLs(Set<String> ids,
+                                                           Map<String, PropertyInfo> propertyInfos) {
+    HashMap<String, Set<String>> result = new HashMap<String, Set<String>>();
+    for (String requestedPropertyId : ids) {
+      PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId);
+
+      String metricsPath = propertyInfo.getPropertyId();
+      String url = extractMetricsURL(metricsPath);
+      Set<String> set;
+      if (!result.containsKey(url)) {
+        set = new HashSet<String>();
+        result.put(url, set);
+      } else {
+        set = result.get(url);
+      }
+      set.add(requestedPropertyId);
+    }
+    return result;
+  }
+
+
+  /**
+   * Extracts requested properties from a given JSON input stream into
+   * resource.
+   *
+   * @param jsonStream           input stream that contains JSON
+   * @param requestedPropertyIds a set of property IDs
+   *                             that should be fetched for this URL
+   * @param resource             all extracted values are placed into resource
+   */
+  private void extractValuesFromJSON(InputStream jsonStream,
+                                     Set<String> requestedPropertyIds,
+                                     Resource resource,
+                                     Map<String, PropertyInfo> propertyInfos)
+      throws IOException {
+    Gson gson = new Gson();
+    Type type = new TypeToken<Map<Object, Object>>() {
+    }.getType();
+    JsonReader jsonReader = new JsonReader(
+        new BufferedReader(new InputStreamReader(jsonStream)));
+    Map<String, String> jsonMap = gson.fromJson(jsonReader, type);
+    for (String requestedPropertyId : requestedPropertyIds) {
+      PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId);
+      String metricsPath = propertyInfo.getPropertyId();
+      String documentPath = extractDocumentPath(metricsPath);
+      String[] docPath = documentPath.split(DOCUMENT_PATH_SEPARATOR);
+      Map<String, String> subMap = jsonMap;
+      for (int i = 0; i < docPath.length; i++) {
+        String pathElement = docPath[i];
+        if (!subMap.containsKey(pathElement)) {
+          String message = String.format(
+              "Can not fetch %dth element of document path (%s) " +
+                  "from json. Wrong metrics path: %s",
+              i, pathElement, metricsPath);
+          throw new IOException(message);
+        }
+        Object jsonSubElement = jsonMap.get(pathElement);
+        if (i == docPath.length - 1) { // Reached target document section
+          // Extract property value
+          resource.setProperty(requestedPropertyId, jsonSubElement);
+        } else { // Navigate to relevant document section
+          subMap = gson.fromJson((JsonElement) jsonSubElement, type);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json
index 83e27d1..c2776ab 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json
+++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json
@@ -2,45 +2,52 @@
   "STORM_REST_API": {
     "Component": [
       {
-        "type": "jmx",
+        "type": "org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider",
+        "properties" : {
+          "default_port": "8745",
+          "port_config_type": "storm-site",
+          "port_property_name": "storm.port",
+          "protocol": "http"
+        },
         "metrics": {
-          "metrics/api/cluster/summary/tasks.total": {
-            "metric": "tasks.total",
+          "metrics/api/cluster/summary/tasks.total":
+          {
+            "metric": "/api/cluster/summary##tasks.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.total": {
-            "metric": "slots.total",
+            "metric": "/api/cluster/summary##slots.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.free": {
-            "metric": "slots.free",
+            "metric": "/api/cluster/summary##slots.free",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/supervisors": {
-            "metric": "supervisors",
+            "metric": "/api/cluster/summary##supervisors",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/executors.total": {
-            "metric": "executors.total",
+            "metric": "/api/cluster/summary##executors.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.used": {
-            "metric": "slots.used",
+            "metric": "/api/cluster/summary##slots.used",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/topologies": {
-            "metric": "topologies",
+            "metric": "/api/cluster/summary##topologies",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/nimbus.uptime": {
-            "metric": "nimbus.uptime",
+            "metric": "/api/cluster/summary##nimbus.uptime",
             "pointInTime": true,
             "temporal": false
           }
@@ -49,51 +56,57 @@
     ],
     "HostComponent": [
       {
-        "type": "jmx",
+        "type": "org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider",
+        "properties" : {
+          "default_port": "8745",
+          "port_config_type": "storm-site",
+          "port_property_name": "storm.port",
+          "protocol": "http"
+        },
         "metrics": {
-          "metrics/api/cluster/summary/tasks.total": {
-            "metric": "tasks.total",
+          "metrics/api/cluster/summary/tasks.total":
+          {
+            "metric": "/api/cluster/summary##tasks.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.total": {
-            "metric": "slots.total",
+            "metric": "/api/cluster/summary##slots.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.free": {
-            "metric": "slots.free",
+            "metric": "/api/cluster/summary##slots.free",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/supervisors": {
-            "metric": "supervisors",
+            "metric": "/api/cluster/summary##supervisors",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/executors.total": {
-            "metric": "executors.total",
+            "metric": "/api/cluster/summary##executors.total",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/slots.used": {
-            "metric": "slots.used",
+            "metric": "/api/cluster/summary##slots.used",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/topologies": {
-            "metric": "topologies",
+            "metric": "/api/cluster/summary##topologies",
             "pointInTime": true,
             "temporal": false
           },
           "metrics/api/cluster/summary/nimbus.uptime": {
-            "metric": "nimbus.uptime",
+            "metric": "/api/cluster/summary##nimbus.uptime",
             "pointInTime": true,
             "temporal": false
           }
         }
       }
-
     ]
   },
   "NIMBUS": {


Mime
View raw message