ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject ambari git commit: AMBARI-9509 Metrics Sinks writing exceptions to HDFS logs (mpapirkovskyy via dsen)
Date Fri, 06 Feb 2015 18:24:03 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk 621303369 -> f6fd6e542


AMBARI-9509 Metrics Sinks writing exceptions to HDFS logs (mpapirkovskyy via dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f6fd6e54
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f6fd6e54
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f6fd6e54

Branch: refs/heads/trunk
Commit: f6fd6e5423ebf48407cc1dbf2c48174edee01c48
Parents: 6213033
Author: Dmytro Sen <dsen@apache.org>
Authored: Fri Feb 6 20:14:14 2015 +0200
Committer: Dmytro Sen <dsen@apache.org>
Committed: Fri Feb 6 20:23:55 2015 +0200

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  5 ++
 .../timeline/AbstractTimelineMetricsSink.java   | 37 +++++----
 .../sink/timeline/UnableToConnectException.java | 46 +++++++++++
 .../cache/HandleConnectExceptionTest.java       | 80 ++++++++++++++++++++
 .../sink/flume/FlumeTimelineMetricsSink.java    |  3 +
 .../timeline/HadoopTimelineMetricsSink.java     |  2 +
 .../ambari-metrics-kafka-sink/pom.xml           |  1 -
 .../sink/storm/StormTimelineMetricsSink.java    |  4 +
 .../ambari-metrics-timelineservice/pom.xml      |  1 -
 ambari-metrics/pom.xml                          |  9 +++
 10 files changed, 170 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index ea8ecb3..bd840eb 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -56,5 +56,10 @@
       <scope>test</scope>
       <version>4.10</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 11663f9..17560ac 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -17,20 +17,20 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketAddress;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-
 public abstract class AbstractTimelineMetricsSink {
   public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
   public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
@@ -56,21 +56,26 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected void emitMetrics(TimelineMetrics metrics) throws IOException {
-    String jsonData = mapper.writeValueAsString(metrics);
-
-    SocketAddress socketAddress = getServerSocketAddress();
+    String connectUrl = getCollectorUri();
+    try {
+      String jsonData = mapper.writeValueAsString(metrics);
 
-    if (socketAddress != null) {
-      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json",
"UTF-8");
+      SocketAddress socketAddress = getServerSocketAddress();
 
-      PostMethod postMethod = new PostMethod(getCollectorUri());
-      postMethod.setRequestEntity(requestEntity);
-      int statusCode = httpClient.executeMethod(postMethod);
-      if (statusCode != 200) {
-        LOG.info("Unable to POST metrics to collector, " + getCollectorUri());
-      } else {
-        LOG.debug("Metrics posted to Collector " + getCollectorUri());
+      if (socketAddress != null) {
+        StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json",
"UTF-8");
+        
+        PostMethod postMethod = new PostMethod(connectUrl);
+        postMethod.setRequestEntity(requestEntity);
+        int statusCode = httpClient.executeMethod(postMethod);
+        if (statusCode != 200) {
+          LOG.info("Unable to POST metrics to collector, " + connectUrl);
+        } else {
+          LOG.debug("Metrics posted to Collector " + connectUrl);
+        }
       }
+    } catch (ConnectException e) {
+      throw new UnableToConnectException(e).setConnectUrl(connectUrl);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
new file mode 100644
index 0000000..797924f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+public class UnableToConnectException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  private String connectUrl;
+
+  public UnableToConnectException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public UnableToConnectException(String message) {
+    super(message);
+  }
+
+  public UnableToConnectException(Throwable cause) {
+    super(cause);
+  }
+
+  public UnableToConnectException setConnectUrl(String connectUrl) {
+    this.connectUrl = connectUrl;
+    return this;
+  }
+
+  public String getConnectUrl() {
+    return connectUrl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
new file mode 100644
index 0000000..450906a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class HandleConnectExceptionTest {
+  private static final String COLLECTOR_URL = "collector";
+  @Mock private HttpClient client;
+  private TestTimelineMetricsSink sink;
+  
+  @Before public void init(){
+    sink = new TestTimelineMetricsSink();
+    sink.setHttpClient(client);
+    
+    try {
+      Mockito.when(client.executeMethod(Mockito.<HttpMethod>any())).thenThrow(new ConnectException());
+    } catch (IOException e) {
+      //no-op
+    }
+  } 
+  
+  @Test
+  public void handleTest(){
+    try{
+      sink.emitMetrics(new TimelineMetrics());
+      Assert.fail();
+    }catch(UnableToConnectException e){
+      Assert.assertEquals(COLLECTOR_URL, e.getConnectUrl());
+    }catch(Exception e){
+      Assert.fail(e.getMessage());
+    }
+  }
+  class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
+    @Override
+    protected SocketAddress getServerSocketAddress() {
+      return new InetSocketAddress("host", 13);
+    }
+    @Override
+    protected String getCollectorUri() {
+      return COLLECTOR_URL;
+    }
+    @Override
+    public void emitMetrics(TimelineMetrics metrics) throws IOException {
+      super.emitMetrics(metrics);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 4207426..d9378fe 100644
--- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -27,6 +27,7 @@ import org.apache.flume.instrumentation.util.JMXPollUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;
@@ -129,6 +130,8 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
           LOG.info("Attributes for component " + component);
           processComponentAttributes(currentTimeMillis, component, attributeMap);
         }
+      } catch (UnableToConnectException uce) {
+        LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
       } catch (Exception e) {
         LOG.error("Unexpected error", e);
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index c0fa373..4271392 100644
--- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -182,6 +182,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink
imple
       if (!metricList.isEmpty()) {
         emitMetrics(timelineMetrics);
       }
+    } catch (UnableToConnectException uce) {
+      LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
     } catch (IOException io) {
       throw new MetricsException("Failed to putMetrics", io);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
index 9a1dc06..22a3772 100644
--- a/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
+++ b/ambari-metrics/ambari-metrics-kafka-sink/pom.xml
@@ -156,7 +156,6 @@ limitations under the License.
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 2b75c97..ffcc6ed 100644
--- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -21,11 +21,13 @@ package org.apache.hadoop.metrics2.sink.storm;
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
+
 import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.math.NumberUtils;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.util.Servers;
@@ -103,6 +105,8 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink
implem
       timelineMetrics.setMetrics(metricList);
       try {
         emitMetrics(timelineMetrics);
+      } catch (UnableToConnectException uce) {
+        LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
       } catch (IOException e) {
         LOG.error("Unexpected error", e);
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 59261ab..81c9a75 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -323,7 +323,6 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.8.5</version>
       <scope>test</scope>
     </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->

http://git-wip-us.apache.org/repos/asf/ambari/blob/f6fd6e54/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 772c52c..efccf63 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -55,6 +55,15 @@
       <url>http://54.235.92.15/nexus/content/groups/public/</url>
     </repository>
   </repositories>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-all</artifactId>
+        <version>1.9.5</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
   <build>
     <plugins>
       <plugin>


Mime
View raw message