Author: llu
Date: Tue Jul 12 10:22:31 2011
New Revision: 1145523
URL: http://svn.apache.org/viewvc?rev=1145523&view=rev
Log:
HADOOP-7324. Ganglia support for metrics v2. (Priyo Mustafi via llu)
Added:
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
Modified:
hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties
hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Tue Jul 12 10:22:31 2011
@@ -9,6 +9,8 @@ Release 0.20.204.0 - unreleased
scripts for easy one node cluster configuration and user creation.
(Eric Yang via omalley)
+ HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)
+
BUG FIXES
MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
Modified: hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties (original)
+++ hadoop/common/branches/branch-0.20-security-204/conf/hadoop-metrics2.properties Tue Jul
12 10:22:31 2011
@@ -14,3 +14,33 @@
#maptask.sink.file.filename=maptask-metrics.out
#reducetask.sink.file.filename=reducetask-metrics.out
+
+
+#
+# Below are for sending metrics to Ganglia
+#
+# for Ganglia 3.0 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
+#
+# for Ganglia 3.1 support
+# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
+
+# *.sink.ganglia.period=10
+
+# default for supportsparse is false
+# *.sink.ganglia.supportsparse=true
+
+#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
+#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
+
+#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#jobtracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#tasktracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#maptask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
+
+#reducetask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
Added: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.util.Servers;
+import org.apache.hadoop.net.DNS;
+
+/**
+ * This the base class for Ganglia sink classes using metrics2. Lot of the code
+ * has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
+ * As per the documentation, sink implementations doesn't have to worry about
+ * thread safety. Hence the code wasn't written for thread safety and should
+ * be modified in case the above assumption changes in the future.
+ */
+public abstract class AbstractGangliaSink implements MetricsSink {
+
+ public final Log LOG = LogFactory.getLog(this.getClass());
+
+ /*
+ * Output of "gmetric --help" showing allowable values
+ * -t, --type=STRING
+ * Either string|int8|uint8|int16|uint16|int32|uint32|float|double
+ * -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
+ * (default='')
+ * -s, --slope=STRING Either zero|positive|negative|both
+ * (default='both')
+ * -x, --tmax=INT The maximum time in seconds between gmetric calls
+ * (default='60')
+ */
+ public static final String DEFAULT_UNITS = "";
+ public static final int DEFAULT_TMAX = 60;
+ public static final int DEFAULT_DMAX = 0;
+ public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
+ public static final int DEFAULT_PORT = 8649;
+ public static final String SERVERS_PROPERTY = "servers";
+ public static final int BUFFER_SIZE = 1500; // as per libgmond.c
+ public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
+ public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
+ public static final String EQUAL = "=";
+
+ private String hostName = "UNKNOWN.example.com";
+ private DatagramSocket datagramSocket;
+ private List<? extends SocketAddress> metricsServers;
+ private byte[] buffer = new byte[BUFFER_SIZE];
+ private int offset;
+ private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
+
+ /**
+ * Used for visiting Metrics
+ */
+ protected final GangliaMetricVisitor gangliaMetricVisitor =
+ new GangliaMetricVisitor();
+
+ private SubsetConfiguration conf;
+ private Map<String, GangliaConf> gangliaConfMap;
+ private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();
+
+ /**
+ * ganglia slope values which equal the ordinal
+ */
+ public enum GangliaSlope {
+ zero, // 0
+ positive, // 1
+ negative, // 2
+ both // 3
+ };
+
+ /**
+ * define enum for various type of conf
+ */
+ public enum GangliaConfType {
+ slope, units, dmax, tmax
+ };
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
+ * .SubsetConfiguration)
+ */
+ public void init(SubsetConfiguration conf) {
+ LOG.debug("Initializing the GangliaSink for Ganglia metrics.");
+
+ this.conf = conf;
+
+ // Take the hostname from the DNS class.
+ if (conf.getString("slave.host.name") != null) {
+ hostName = conf.getString("slave.host.name");
+ } else {
+ try {
+ hostName = DNS.getDefaultHost(
+ conf.getString("dfs.datanode.dns.interface", "default"),
+ conf.getString("dfs.datanode.dns.nameserver", "default"));
+ } catch (UnknownHostException uhe) {
+ LOG.error(uhe);
+ hostName = "UNKNOWN.example.com";
+ }
+ }
+
+ // load the gannglia servers from properties
+ metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
+ DEFAULT_PORT);
+
+ // extract the Ganglia conf per metrics
+ gangliaConfMap = new HashMap<String, GangliaConf>();
+ loadGangliaConf(GangliaConfType.units);
+ loadGangliaConf(GangliaConfType.tmax);
+ loadGangliaConf(GangliaConfType.dmax);
+ loadGangliaConf(GangliaConfType.slope);
+
+ try {
+ datagramSocket = new DatagramSocket();
+ } catch (SocketException se) {
+ LOG.error(se);
+ }
+
+ // see if sparseMetrics is supported. Default is false
+ supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
+ SUPPORT_SPARSE_METRICS_DEFAULT);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.metrics2.MetricsSink#flush()
+ */
+ public void flush() {
+ // nothing to do as we are not buffering data
+ }
+
+ /**
+ * Load the configurations for a conf type
+ *
+ * @param gtype Only load metrics for given type
+ */
+ private void loadGangliaConf(GangliaConfType gtype) {
+ String propertyarr[] = conf.getStringArray(gtype.name());
+ if (propertyarr != null && propertyarr.length > 0) {
+ for (String metricNValue : propertyarr) {
+ String metricNValueArr[] = metricNValue.split(EQUAL);
+ if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
+ LOG.error("Invalid propertylist for " + gtype.name());
+ }
+
+ String metricName = metricNValueArr[0].trim();
+ String metricValue = metricNValueArr[1].trim();
+ GangliaConf gconf = gangliaConfMap.get(metricName);
+ if (gconf == null) {
+ gconf = new GangliaConf();
+ gangliaConfMap.put(metricName, gconf);
+ }
+
+ switch (gtype) {
+ case units:
+ gconf.setUnits(metricValue);
+ break;
+ case dmax:
+ gconf.setDmax(Integer.parseInt(metricValue));
+ break;
+ case tmax:
+ gconf.setTmax(Integer.parseInt(metricValue));
+ break;
+ case slope:
+ gconf.setSlope(GangliaSlope.valueOf(metricValue));
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Lookup GangliaConf from cache. If not found, return default values
+ *
+ * @param metricName
+ * @return looked up GangliaConf
+ */
+ protected GangliaConf getGangliaConfForMetric(String metricName) {
+ GangliaConf gconf = gangliaConfMap.get(metricName);
+
+ return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
+ }
+
+ /**
+ * @return the hostName
+ */
+ protected String getHostName() {
+ return hostName;
+ }
+
+ /**
+ * Puts a string into the buffer by first writing the size of the string as an
+ * int, followed by the bytes of the string, padded if necessary to a multiple
+ * of 4.
+ * @param s the string to be written to buffer at offset location
+ */
+ protected void xdr_string(String s) {
+ byte[] bytes = s.getBytes();
+ int len = bytes.length;
+ xdr_int(len);
+ System.arraycopy(bytes, 0, buffer, offset, len);
+ offset += len;
+ pad();
+ }
+
+ /**
+ * Pads the buffer with zero bytes up to the nearest multiple of 4.
+ */
+ private void pad() {
+ int newOffset = ((offset + 3) / 4) * 4;
+ while (offset < newOffset) {
+ buffer[offset++] = 0;
+ }
+ }
+
+ /**
+ * Puts an integer into the buffer as 4 bytes, big-endian.
+ */
+ protected void xdr_int(int i) {
+ buffer[offset++] = (byte) ((i >> 24) & 0xff);
+ buffer[offset++] = (byte) ((i >> 16) & 0xff);
+ buffer[offset++] = (byte) ((i >> 8) & 0xff);
+ buffer[offset++] = (byte) (i & 0xff);
+ }
+
+ /**
+ * Sends Ganglia Metrics to the configured hosts
+ * @throws IOException
+ */
+ protected void emitToGangliaHosts() throws IOException {
+ try {
+ for (SocketAddress socketAddress : metricsServers) {
+ DatagramPacket packet =
+ new DatagramPacket(buffer, offset, socketAddress);
+ datagramSocket.send(packet);
+ }
+ } finally {
+ // reset the buffer for the next metric to be built
+ offset = 0;
+ }
+ }
+
+ /**
+ * @return whether sparse metrics are supported
+ */
+ protected boolean isSupportSparseMetrics() {
+ return supportSparseMetrics;
+ }
+
+ /**
+ * Used only by unit test
+ * @param datagramSocket the datagramSocket to set.
+ */
+ void setDatagramSocket(DatagramSocket datagramSocket) {
+ this.datagramSocket = datagramSocket;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaConf.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * class which is used to store ganglia properties
+ */
+class GangliaConf {
+ private String units = AbstractGangliaSink.DEFAULT_UNITS;
+ private GangliaSlope slope;
+ private int dmax = AbstractGangliaSink.DEFAULT_DMAX;
+ private int tmax = AbstractGangliaSink.DEFAULT_TMAX;
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("unit=").append(units).append(", slope=").append(slope)
+ .append(", dmax=").append(dmax).append(", tmax=").append(tmax);
+ return buf.toString();
+ }
+
+ /**
+ * @return the units
+ */
+ String getUnits() {
+ return units;
+ }
+
+ /**
+ * @param units the units to set
+ */
+ void setUnits(String units) {
+ this.units = units;
+ }
+
+ /**
+ * @return the slope
+ */
+ GangliaSlope getSlope() {
+ return slope;
+ }
+
+ /**
+ * @param slope the slope to set
+ */
+ void setSlope(GangliaSlope slope) {
+ this.slope = slope;
+ }
+
+ /**
+ * @return the dmax
+ */
+ int getDmax() {
+ return dmax;
+ }
+
+ /**
+ * @param dmax the dmax to set
+ */
+ void setDmax(int dmax) {
+ this.dmax = dmax;
+ }
+
+ /**
+ * @return the tmax
+ */
+ int getTmax() {
+ return tmax;
+ }
+
+ /**
+ * @param tmax the tmax to set
+ */
+ void setTmax(int tmax) {
+ this.tmax = tmax;
+ }
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricVisitor.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.hadoop.metrics2.MetricCounter;
+import org.apache.hadoop.metrics2.MetricGauge;
+import org.apache.hadoop.metrics2.MetricsVisitor;
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink.GangliaSlope;
+
+/**
+ * Since implementations of Metric are not public, hence use a visitor to
+ * figure out the type and slope of the metric. Counters have "positive"
+ * slope.
+ */
+class GangliaMetricVisitor implements MetricsVisitor {
+ private static final String INT32 = "int32";
+ private static final String FLOAT = "float";
+ private static final String DOUBLE = "double";
+
+ private String type;
+ private GangliaSlope slope;
+
+ /**
+ * @return the type of a visited metric
+ */
+ String getType() {
+ return type;
+ }
+
+ /**
+ * @return the slope of a visited metric. Slope is positive for counters
+ * and null for others
+ */
+ GangliaSlope getSlope() {
+ return slope;
+ }
+
+ @Override
+ public void gauge(MetricGauge<Integer> metric, int value) {
+ // MetricGaugeInt.class ==> "int32"
+ type = INT32;
+ slope = null; // set to null as cannot figure out from Metric
+ }
+
+ @Override
+ public void counter(MetricCounter<Integer> metric, int value) {
+ // MetricCounterInt.class ==> "int32"
+ type = INT32;
+
+ // counters have positive slope
+ slope = GangliaSlope.positive;
+ }
+
+ @Override
+ public void gauge(MetricGauge<Long> metric, long value) {
+ // MetricGaugeLong.class ==> "float"
+ type = FLOAT;
+ slope = null; // set to null as cannot figure out from Metric
+ }
+
+ @Override
+ public void counter(MetricCounter<Long> metric, long value) {
+ // MetricCounterLong.class ==> "float"
+ type = FLOAT;
+
+ // counters have positive slope
+ slope = GangliaSlope.positive;
+ }
+
+ @Override
+ public void gauge(MetricGauge<Float> metric, float value) {
+ // MetricGaugeFloat.class ==> "float"
+ type = FLOAT;
+ slope = null; // set to null as cannot figure out from Metric
+ }
+
+ @Override
+ public void gauge(MetricGauge<Double> metric, double value) {
+ // MetricGaugeDouble.class ==> "double"
+ type = DOUBLE;
+ slope = null; // set to null as cannot figure out from Metric
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink30.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.Metric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.util.MetricsCache;
+import org.apache.hadoop.metrics2.util.MetricsCache.Record;
+
+/**
+ * This code supports Ganglia 3.0
+ *
+ */
+public class GangliaSink30 extends AbstractGangliaSink {
+
+ public final Log LOG = LogFactory.getLog(this.getClass());
+
+ protected MetricsCache metricsCache = new MetricsCache();
+
+ /*
+ *
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.metrics2.MetricsSink#putMetrics(org.apache.hadoop.metrics2
+ * .MetricsRecord)
+ */
+ @Override
+ public void putMetrics(MetricsRecord record) {
+ // The method handles both cases whether Ganglia support dense publish of
+ // metrics of sparse (only on change) publish of metrics
+ try {
+ String recordName = record.name();
+ String contextName = record.context();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(contextName);
+ sb.append('.');
+ sb.append(recordName);
+
+ String groupName = sb.toString();
+ sb.append('.');
+ int sbBaseLen = sb.length();
+
+ String type = null;
+ GangliaSlope slopeFromMetric = null;
+ GangliaSlope calculatedSlope = null;
+ Record cachedMetrics = null;
+ if (!isSupportSparseMetrics()) {
+ // for sending dense metrics, update metrics cache
+ // and get the updated data
+ cachedMetrics = metricsCache.update(record);
+
+ if (cachedMetrics != null && cachedMetrics.metricsEntrySet() != null) {
+ for (Map.Entry<String, Metric> entry : cachedMetrics.metricsEntrySet()) {
+ Metric metric = entry.getValue();
+ sb.append(metric.name());
+ String name = sb.toString();
+
+ // visit the metric to identify the Ganglia type and slope
+ metric.visit(gangliaMetricVisitor);
+ type = gangliaMetricVisitor.getType();
+ slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+
+ GangliaConf gConf = getGangliaConfForMetric(name);
+ calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+ // send metric to Ganglia
+ emitMetric(groupName, name, type, metric.value().toString(),
+ gConf, calculatedSlope);
+
+ // reset the length of the buffer for next iteration
+ sb.setLength(sbBaseLen);
+ }
+ }
+ } else {
+ // we support sparse updates
+
+ Collection<Metric> metrics = (Collection<Metric>) record.metrics();
+ if (metrics.size() > 0) {
+ // we got metrics. so send the latest
+ for (Metric metric : record.metrics()) {
+ sb.append(metric.name());
+ String name = sb.toString();
+
+ // visit the metric to identify the Ganglia type and slope
+ metric.visit(gangliaMetricVisitor);
+ type = gangliaMetricVisitor.getType();
+ slopeFromMetric = gangliaMetricVisitor.getSlope();
+
+
+ GangliaConf gConf = getGangliaConfForMetric(name);
+ calculatedSlope = calculateSlope(gConf, slopeFromMetric);
+
+ // send metric to Ganglia
+ emitMetric(groupName, name, type, metric.value().toString(),
+ gConf, calculatedSlope);
+
+ // reset the length of the buffer for next iteration
+ sb.setLength(sbBaseLen);
+ }
+ }
+ }
+ } catch (IOException io) {
+ throw new MetricsException("Failed to putMetrics", io);
+ }
+ }
+
+
+ /**
+ * Calculate the slope from properties and metric
+ *
+ * @param gConf Pass
+ * @param slopeFromMetric
+ * @return
+ */
+ private GangliaSlope calculateSlope(GangliaConf gConf, GangliaSlope slopeFromMetric) {
+ if (gConf.getSlope() != null) {
+ // if slope has been specified in properties, use that
+ return gConf.getSlope();
+ } else if (slopeFromMetric != null) {
+ // slope not specified in properties, use derived from Metric
+ return slopeFromMetric;
+ } else {
+ return DEFAULT_SLOPE;
+ }
+ }
+
+ /**
+ * The method sends metrics to Ganglia servers. The method has been taken from
+ * org.apache.hadoop.metrics.ganglia.GangliaContext30 with minimal changes in
+ * order to keep it in sync.
+
+ * @param groupName The group name of the metric
+ * @param name The metric name
+ * @param type The type of the metric
+ * @param value The value of the metric
+ * @param gConf The GangliaConf for this metric
+ * @param gSlope The slope for this metric
+ * @throws IOException
+ */
+ protected void emitMetric(String groupName, String name, String type,
+ String value, GangliaConf gConf, GangliaSlope gSlope)
+ throws IOException {
+
+ if (name == null) {
+ LOG.warn("Metric was emitted with no name.");
+ return;
+ } else if (value == null) {
+ LOG.warn("Metric name " + name + " was emitted with a null value.");
+ return;
+ } else if (type == null) {
+ LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emitting metric " + name + ", type " + type + ", value " + value
+ + ", slope " + gSlope.name()+ " from hostname " + getHostName());
+ }
+
+ xdr_int(0); // metric_user_defined
+ xdr_string(type);
+ xdr_string(name);
+ xdr_string(value);
+ xdr_string(gConf.getUnits());
+ xdr_int(gSlope.ordinal());
+ xdr_int(gConf.getTmax());
+ xdr_int(gConf.getDmax());
+
+ // send the metric to Ganglia hosts
+ emitToGangliaHosts();
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/sink/ganglia/GangliaSink31.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This code supports Ganglia 3.1
+ *
+ */
+public class GangliaSink31 extends GangliaSink30 {
+
+ public final Log LOG = LogFactory.getLog(this.getClass());
+
+ /**
+ * The method sends metrics to Ganglia servers. The method has been taken from
+ * org.apache.hadoop.metrics.ganglia.GangliaContext31 with minimal changes in
+ * order to keep it in sync.
+
+ * @param groupName The group name of the metric
+ * @param name The metric name
+ * @param type The type of the metric
+ * @param value The value of the metric
+ * @param gConf The GangliaConf for this metric
+ * @param gSlope The slope for this metric
+ * @throws IOException
+ */
+ protected void emitMetric(String groupName, String name, String type,
+ String value, GangliaConf gConf, GangliaSlope gSlope)
+ throws IOException {
+
+ if (name == null) {
+ LOG.warn("Metric was emitted with no name.");
+ return;
+ } else if (value == null) {
+ LOG.warn("Metric name " + name +" was emitted with a null value.");
+ return;
+ } else if (type == null) {
+ LOG.warn("Metric name " + name + ", value " + value + " has no type.");
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emitting metric " + name + ", type " + type + ", value " + value
+ + ", slope " + gSlope.name()+ " from hostname " + getHostName());
+ }
+
+ // The following XDR recipe was done through a careful reading of
+ // gm_protocol.x in Ganglia 3.1 and carefully examining the output of
+ // the gmetric utility with strace.
+
+ // First we send out a metadata message
+ xdr_int(128); // metric_id = metadata_msg
+ xdr_string(getHostName()); // hostname
+ xdr_string(name); // metric name
+ xdr_int(0); // spoof = False
+ xdr_string(type); // metric type
+ xdr_string(name); // metric name
+ xdr_string(gConf.getUnits()); // units
+ xdr_int(gSlope.ordinal()); // slope
+ xdr_int(gConf.getTmax()); // tmax, the maximum time between metrics
+ xdr_int(gConf.getDmax()); // dmax, the maximum data value
+ xdr_int(1); /*Num of the entries in extra_value field for
+ Ganglia 3.1.x*/
+ xdr_string("GROUP"); /*Group attribute*/
+ xdr_string(groupName); /*Group value*/
+
+ // send the metric to Ganglia hosts
+ emitToGangliaHosts();
+
+ // Now we send out a message with the actual value.
+ // Technically, we only need to send out the metadata message once for
+ // each metric, but I don't want to have to record which metrics we did and
+ // did not send.
+ xdr_int(133); // we are sending a string value
+ xdr_string(getHostName()); // hostName
+ xdr_string(name); // metric name
+ xdr_int(0); // spoof = False
+ xdr_string("%s"); // format field
+ xdr_string(value); // metric value
+
+ // send the metric to Ganglia hosts
+ emitToGangliaHosts();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
(original)
+++ hadoop/common/branches/branch-0.20-security-204/src/core/org/apache/hadoop/metrics2/util/MetricsCache.java
Tue Jul 12 10:22:31 2011
@@ -45,7 +45,7 @@ public class MetricsCache {
*/
public static class Record {
final Map<String, String> tags = new LinkedHashMap<String, String>();
- final Map<String, Number> metrics = new LinkedHashMap<String, Number>();
+ final Map<String, Metric> metrics = new LinkedHashMap<String, Metric>();
/**
* Get the tag value
@@ -62,6 +62,16 @@ public class MetricsCache {
* @return the metric value
*/
public Number getMetric(String key) {
+ Metric metric = metrics.get(key);
+ return metric != null ? metric.value() : null;
+ }
+
+ /**
+ * Get the metric value
+ * @param key name of the metric
+ * @return the metric value
+ */
+ public Metric getMetricInstance(String key) {
return metrics.get(key);
}
@@ -69,6 +79,18 @@ public class MetricsCache {
* @return entry set of metrics
*/
public Set<Map.Entry<String, Number>> metrics() {
+ Map<String, Number> map =
+ new LinkedHashMap<String,Number>(metrics.size());
+ for (Map.Entry<String, Metric> mapEntry : metrics.entrySet()) {
+ map.put(mapEntry.getKey(), mapEntry.getValue().value());
+ }
+ return map.entrySet();
+ }
+
+ /**
+ * @return entry set of metrics
+ */
+ public Set<Map.Entry<String, Metric>> metricsEntrySet() {
return metrics.entrySet();
}
}
@@ -93,7 +115,7 @@ public class MetricsCache {
recMap.put(tags, rec);
}
for (Metric m : mr.metrics()) {
- rec.metrics.put(m.name(), m.value());
+ rec.metrics.put(m.name(), m);
}
if (includingTags) {
// mostly for some sinks that include tags as part of a dense schema
@@ -119,5 +141,4 @@ public class MetricsCache {
if (tmap == null) return null;
return tmap.get(tags);
}
-
}
Added: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestGangliaMetrics.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.lib.AbstractMetricsSource;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableStat;
+import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31;
+import org.apache.hadoop.metrics2.sink.ganglia.GangliaMetricsTestHelper;
+import org.junit.Test;
+
+public class TestGangliaMetrics {
+ public static final Log LOG = LogFactory.getLog(TestMetricsSystemImpl.class);
+ private final String[] expectedMetrics =
+ {"test.s1rec.c1",
+ "test.s1rec.g1",
+ "test.s1rec.s1_num_ops",
+ "test.s1rec.s1_avg_time"};
+
+ @Test public void testGangliaMetrics2() throws Exception {
+ ConfigBuilder cb = new ConfigBuilder().add("default.period", 10)
+ .add("test.sink.gsink30.context", "test") // filter out only "test"
+ .add("test.sink.gsink31.context", "test") // filter out only "test"
+ .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
+
+ MetricsSystemImpl ms = new MetricsSystemImpl("Test");
+ ms.start();
+ TestSource s1 = ms.register("s1", "s1 desc", new TestSource("s1rec"));
+ s1.s1.add(0);
+
+ AbstractGangliaSink gsink30 = new GangliaSink30();
+ gsink30.init(cb.subset("test"));
+ MockDatagramSocket mockds30 = new MockDatagramSocket();
+ GangliaMetricsTestHelper.setDatagramSocket(gsink30, mockds30);
+
+ AbstractGangliaSink gsink31 = new GangliaSink31();
+ gsink31.init(cb.subset("test"));
+ MockDatagramSocket mockds31 = new MockDatagramSocket();
+ GangliaMetricsTestHelper.setDatagramSocket(gsink31, mockds31);
+
+ ms.register("gsink30", "gsink30 desc", gsink30);
+ ms.register("gsink31", "gsink31 desc", gsink31);
+ ms.onTimerEvent(); // trigger something interesting
+ ms.stop();
+
+ // check GanfliaSink30 data
+ checkMetrics(mockds30.getCapturedSend());
+
+ // check GanfliaSink31 data
+ checkMetrics(mockds31.getCapturedSend());
+ }
+
+
+ private void checkMetrics(List<byte[]> bytearrlist) {
+ boolean[] foundMetrics = new boolean[expectedMetrics.length];
+ for (byte[] bytes : bytearrlist) {
+ String binaryStr = new String(bytes);
+ for (int index = 0; index < expectedMetrics.length; index++) {
+ if (binaryStr.indexOf(expectedMetrics[index]) >= 0) {
+ foundMetrics[index] = true;
+ break;
+ }
+ }
+ }
+
+ for (int index = 0; index < foundMetrics.length; index++) {
+ if (!foundMetrics[index]) {
+ assertTrue("Missing metrics: " + expectedMetrics[index], false);
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static class TestSource extends AbstractMetricsSource {
+ final MetricMutableCounterLong c1;
+ final MetricMutableGaugeLong g1;
+ final MetricMutableStat s1;
+
+ TestSource(String name) {
+ super(name);
+ registry.setContext("test");
+ c1 = registry.newCounter("c1", "c1 desc", 1L);
+ g1 = registry.newGauge("g1", "g1 desc", 2L);
+ s1 = registry.newStat("s1", "s1 desc", "ops", "time");
+ }
+ }
+
+ /**
+ * This class is used to capture data send to Ganglia servers.
+ *
+ * Initial attempt was to use mockito to mock and capture but
+ * while testing figured out that mockito is keeping the reference
+ * to the byte array and since the sink code reuses the byte array
+ * hence all the captured byte arrays were pointing to one instance.
+ */
+ private static class MockDatagramSocket extends DatagramSocket {
+ private ArrayList<byte[]> capture;
+
+ public MockDatagramSocket() throws SocketException {
+ capture = new ArrayList<byte[]>();
+ }
+ /* (non-Javadoc)
+ * @see java.net.DatagramSocket#send(java.net.DatagramPacket)
+ */
+ @Override
+ public void send(DatagramPacket p) throws IOException {
+ byte[] bytes = new byte[p.getLength()];
+ System.arraycopy(p.getData(), p.getOffset(), bytes, 0, p.getLength());
+ capture.add(bytes);
+ }
+
+ ArrayList<byte[]> getCapturedSend() {
+ return capture;
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java?rev=1145523&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
(added)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/sink/ganglia/GangliaMetricsTestHelper.java
Tue Jul 12 10:22:31 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import java.net.DatagramSocket;
+
+/**
+ * Helper class in the same package as ganglia sinks to be used by unit tests
+ */
+public class GangliaMetricsTestHelper {
+
+ /**
+ * Helper method to access package private method to set DatagramSocket
+ * needed for Unit test
+ * @param gangliaSink
+ * @param datagramSocket
+ */
+ public static void setDatagramSocket(AbstractGangliaSink gangliaSink,
+ DatagramSocket datagramSocket) {
+
+ gangliaSink.setDatagramSocket(datagramSocket);
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java?rev=1145523&r1=1145522&r2=1145523&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
(original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/util/TestMetricsCache.java
Tue Jul 12 10:22:31 2011
@@ -47,6 +47,9 @@ public class TestMetricsCache {
assertEquals("same record size", cr.metrics.size(),
((Collection<Metric>)mr.metrics()).size());
assertEquals("same metric value", 0, cr.getMetric("m"));
+ assertNotNull("metric not null", cr.getMetricInstance("m"));
+ assertEquals("new metric value", 0, cr.getMetricInstance("m").value());
+
MetricsRecord mr2 = makeRecord("r",
Arrays.asList(makeTag("t", "tv")),
@@ -54,8 +57,16 @@ public class TestMetricsCache {
cr = cache.update(mr2);
assertEquals("contains 3 metric", 3, cr.metrics.size());
assertEquals("updated metric value", 2, cr.getMetric("m"));
+ assertNotNull("metric not null", cr.getMetricInstance("m"));
+ assertEquals("new metric value", 2, cr.getMetricInstance("m").value());
+
assertEquals("old metric value", 1, cr.getMetric("m1"));
+ assertNotNull("metric not null", cr.getMetricInstance("m1"));
+ assertEquals("new metric value", 1, cr.getMetricInstance("m1").value());
+
assertEquals("new metric value", 42, cr.getMetric("m2"));
+ assertNotNull("metric not null", cr.getMetricInstance("m2"));
+ assertEquals("new metric value", 42, cr.getMetricInstance("m2").value());
MetricsRecord mr3 = makeRecord("r",
Arrays.asList(makeTag("t", "tv3")), // different tag value
@@ -63,6 +74,9 @@ public class TestMetricsCache {
cr = cache.update(mr3); // should get a new record
assertEquals("contains 1 metric", 1, cr.metrics.size());
assertEquals("updated metric value", 3, cr.getMetric("m3"));
+ assertNotNull("metric not null", cr.getMetricInstance("m3"));
+ assertEquals("new metric value", 3, cr.getMetricInstance("m3").value());
+
// tags cache should be empty so far
assertEquals("no tags", 0, cr.tags.size());
// until now
@@ -70,6 +84,8 @@ public class TestMetricsCache {
assertEquals("Got 1 tag", 1, cr.tags.size());
assertEquals("Tag value", "tv3", cr.getTag("t"));
assertEquals("Metric value", 3, cr.getMetric("m3"));
+ assertNotNull("metric not null", cr.getMetricInstance("m3"));
+ assertEquals("new metric value", 3, cr.getMetricInstance("m3").value());
}
@Test public void testGet() {
@@ -85,6 +101,8 @@ public class TestMetricsCache {
assertNotNull("Got record", cr);
assertEquals("contains 1 metric", 1, cr.metrics.size());
assertEquals("new metric value", 1, cr.getMetric("m"));
+ assertNotNull("metric not null", cr.getMetricInstance("m"));
+ assertEquals("new metric value", 1, cr.getMetricInstance("m").value());
}
private MetricsRecord makeRecord(String name, Collection<MetricsTag> tags,
|