From commits-return-23467-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Tue Oct 8 18:58:50 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 53CD2180654 for ; Tue, 8 Oct 2019 20:58:50 +0200 (CEST) Received: (qmail 64732 invoked by uid 500); 8 Oct 2019 18:58:49 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 64723 invoked by uid 99); 8 Oct 2019 18:58:49 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Oct 2019 18:58:49 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8B6F1890A2; Tue, 8 Oct 2019 18:58:49 +0000 (UTC) Date: Tue, 08 Oct 2019 18:58:49 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] branch master updated: Update gc metrics reporting to use hadoop metrics2 (#1381) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157056112945.1602.13113352057414382471@gitbox.apache.org> From: edcoleman@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 1fd427a90923de36e4d351d2fd81dd2ec2c3a136 X-Git-Newrev: 8f867bd642c7558f4183de22bea9945d9e6a4d09 X-Git-Rev: 8f867bd642c7558f4183de22bea9945d9e6a4d09 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/master by this push: new 8f867bd Update gc metrics reporting to use hadoop metrics2 (#1381) 8f867bd is described below commit 8f867bd642c7558f4183de22bea9945d9e6a4d09 Author: EdColeman AuthorDate: Tue Oct 8 14:58:44 2019 -0400 Update gc metrics reporting to use hadoop metrics2 (#1381) * Update gc metrics reporting to use hadoop metrics2 - Publish current gc metrics to hadoop 2 metrics reporting system. - Add gc run counter that increments on each gc cycle run - Add metric to track time required for gc post op (compact, flush, none) * address pull request comments * address review comments --- .../org/apache/accumulo/core/conf/Property.java | 7 +- .../apache/accumulo/server/metrics/Metrics.java | 14 ++ .../apache/accumulo/gc/SimpleGarbageCollector.java | 21 ++ .../apache/accumulo/gc/metrics/GcCycleMetrics.java | 121 ++++++++++ .../org/apache/accumulo/gc/metrics/GcMetrics.java | 128 +++++++++++ .../accumulo/gc/metrics/GcMetricsFactory.java | 60 +++++ .../accumulo/test/functional/GcMetricsIT.java | 220 ++++++++++++++++++ .../accumulo/test/metrics/MetricsFileTailer.java | 255 +++++++++++++++++++++ .../test/metrics/MetricsTestSinkProperties.java | 28 +++ .../test/metrics/MetricsFileTailerTest.java | 106 +++++++++ .../resources/hadoop-metrics2-accumulo.properties | 11 +- 11 files changed, 961 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 7be7355..8243134 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -260,7 +260,7 @@ public enum Property { MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser", PropertyType.CLASSNAME, "A class that implements a mechanism to steal write access to a write-ahead log"), - MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "false", PropertyType.BOOLEAN, + MASTER_FATE_METRICS_ENABLED("master.fate.metrics.enabled", "true", PropertyType.BOOLEAN, "Enable reporting of FATE metrics in JMX (and logging with Hadoop Metrics2"), MASTER_FATE_METRICS_MIN_UPDATE_INTERVAL("master.fate.metrics.min.update.interval", "60s", PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper to update interval"), @@ -529,8 +529,9 @@ public enum Property { "When the gc runs it can make a lot of changes to the metadata, on completion, " + " to force the changes to be written to disk, the metadata and root tables can be flushed" + " and possibly compacted. Legal values are: compact - which both flushes and compacts the" - + " metadata; flush - which flushes only (compactions may be triggered if required); or none." - + " Since 2.0, the default is flush. Previously the default action was a full compaction."), + + " metadata; flush - which flushes only (compactions may be triggered if required); or none"), + GC_METRICS_ENABLED("gc.metrics.enabled", "true", PropertyType.BOOLEAN, + "Enable detailed gc metrics reporting with hadoop metrics."), // properties that are specific to the monitor server behavior MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java index eccaaf5..c1ed9ed 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java @@ -29,6 +29,20 @@ import org.apache.hadoop.metrics2.source.JvmMetricsInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Accumulo will search for a file named hadoop-metrics-accumulo.properties on the Accumulo + * classpath to configute the hadoop metrics2 system. The hadoop metrics system publishes to jmx and + * can be configured, via a configuration file, to publish to other metric collection systems + * (files,...) + *

+ * A note on naming: The naming for jmx vs the hadoop metrics systems are slightly different. Hadoop + * metrics records will start with CONTEXT.RECORD, for example, accgc.AccGcCycleMetrics. The context + * parameter value is also used by the configuration file for sink configuration. + *

+ * In JMX, the hierarchy is: Hadoop..Accumulo..[jmxName]..[processName]..attributes..[name] + *

+ * For jvm metrics, the hierarchy is Hadoop..Accumulo..JvmMetrics..attributes..[name] + */ public abstract class Metrics implements MetricsSource { private static String processName = "Unknown"; diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 62d9f62..16d4a78 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -71,6 +71,8 @@ import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; +import org.apache.accumulo.gc.metrics.GcCycleMetrics; +import org.apache.accumulo.gc.metrics.GcMetricsFactory; import org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.ServerConstants; @@ -117,6 +119,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { private GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats()); + private GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); + public static void main(String[] args) throws Exception { try (SimpleGarbageCollector gc = new SimpleGarbageCollector(new ServerOpts(), args)) { gc.runServer(); @@ -128,6 +132,14 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { final AccumuloConfiguration conf = getConfiguration(); + boolean gcMetricsRegistered = new GcMetricsFactory(conf).register(this); + + if (gcMetricsRegistered) { + log.info("gc metrics modules registered with metrics system"); + } else { + log.info("Failed to register gc metrics module"); + } + final long gcDelay = conf.getTimeInMillis(Property.GC_CYCLE_DELAY); final String useFullCompaction = conf.get(Property.GC_USE_FULL_COMPACTION); @@ -506,6 +518,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { status.current.finished = System.currentTimeMillis(); status.last = status.current; + gcCycleMetrics.setLastCollect(status.current); status.current = new GcCycleStats(); } catch (Exception e) { @@ -534,6 +547,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash()); log.info("Beginning garbage collection of write-ahead logs"); walogCollector.collect(status); + gcCycleMetrics.setLastWalCollect(status.lastLog); } catch (Exception e) { log.error("{}", e.getMessage(), e); } @@ -563,6 +577,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { final long actionComplete = System.nanoTime(); + gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); + log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); @@ -571,6 +587,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { } } try { + gcCycleMetrics.incrementRunCycleCount(); long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); log.debug("Sleeping for {} milliseconds", gcDelay); Thread.sleep(gcDelay); @@ -696,4 +713,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { public GCStatus getStatus(TInfo info, TCredentials credentials) { return status; } + + public GcCycleMetrics getGcCycleMetrics() { + return gcCycleMetrics; + } } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java new file mode 100644 index 0000000..4caf8c3 --- /dev/null +++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcCycleMetrics.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.gc.metrics; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.gc.thrift.GcCycleStats; + +/** + * Wrapper class for GcCycleStats so that underlying thrift code in GcCycleStats is not modified. + * Provides Thread safe access to the gc cycle stats for metrics reporting. + */ +public class GcCycleMetrics { + + private AtomicReference lastCollect = new AtomicReference<>(new GcCycleStats()); + private AtomicReference lastWalCollect = new AtomicReference<>(new GcCycleStats()); + + private AtomicLong postOpDurationNanos = new AtomicLong(0); + private AtomicLong runCycleCount = new AtomicLong(0); + + public GcCycleMetrics() {} + + /** + * Get the last gc run statistics. + * + * @return the statistics for the last gc run. + */ + GcCycleStats getLastCollect() { + return lastCollect.get(); + } + + /** + * Set the last gc run statistics. Makes a defensive deep copy so that if the gc implementation + * modifies the values. + * + * @param lastCollect + * the last gc run statistics. + */ + public void setLastCollect(final GcCycleStats lastCollect) { + this.lastCollect.set(new GcCycleStats(lastCollect)); + } + + /** + * The statistics from the last wal collection. + * + * @return the last wal collection statistics. + */ + GcCycleStats getLastWalCollect() { + return lastWalCollect.get(); + } + + /** + * Set the lost wal collection statistics + * + * @param lastWalCollect + * last wal statistics + */ + public void setLastWalCollect(final GcCycleStats lastWalCollect) { + this.lastWalCollect.set(new GcCycleStats(lastWalCollect)); + } + + /** + * Duration of post operation (compact, flush, none) in nanoseconds. + * + * @return duration in nanoseconds. + */ + long getPostOpDurationNanos() { + return postOpDurationNanos.get(); + } + + /** + * Set the duration of post operation (compact, flush, none) in nanoseconds. + * + * @param postOpDurationNanos + * the duration, in nanoseconds. + */ + public void setPostOpDurationNanos(long postOpDurationNanos) { + this.postOpDurationNanos.set(postOpDurationNanos); + } + + /** + * The number of gc cycles that have completed since initialization at process start. + * + * @return current run cycle count. + */ + long getRunCycleCount() { + return runCycleCount.get(); + } + + /** + * Increment the gc run cycle count by one. + */ + public void incrementRunCycleCount() { + this.runCycleCount.incrementAndGet(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("GcMetricsValues{"); + sb.append("lastCollect=").append(lastCollect.get()); + sb.append(", lastWalCollect=").append(lastWalCollect.get()); + sb.append(", postOpDuration=").append(postOpDurationNanos.get()); + sb.append('}'); + return sb.toString(); + } +} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java new file mode 100644 index 0000000..b1b9020 --- /dev/null +++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetrics.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.gc.metrics; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.gc.thrift.GcCycleStats; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * Expected to be instantiated with GcMetricsFactory. This will configure both jmx and the hadoop + * metrics systems. The naming convention, in hadoop metrics2, the records will appear as + * CONTEXT.RECORD (accgc.AccGcCycleMetrics). The value for context is also used by the configuration + * file for sink configuration. + */ +public class GcMetrics extends Metrics { + + // use common prefix, different that just gc, to prevent confusion with jvm gc metrics. + public static final String GC_METRIC_PREFIX = "AccGc"; + + private static final String jmxName = "GarbageCollector"; + private static final String description = "Accumulo garbage collection metrics"; + private static final String record = "AccGcCycleMetrics"; + + private final SimpleGarbageCollector gc; + + // metrics gauges / counters. + private final MutableGaugeLong gcStarted; + private final MutableGaugeLong gcFinished; + private final MutableGaugeLong gcCandidates; + private final MutableGaugeLong gcInUse; + private final MutableGaugeLong gcDeleted; + private final MutableGaugeLong gcErrors; + + private final MutableGaugeLong walStarted; + private final MutableGaugeLong walFinished; + private final MutableGaugeLong walCandidates; + private final MutableGaugeLong walInUse; + private final MutableGaugeLong walDeleted; + private final MutableGaugeLong walErrors; + + private final MutableGaugeLong postOpDuration; + private final MutableGaugeLong runCycleCount; + + GcMetrics(final SimpleGarbageCollector gc) { + super(jmxName + ",sub=" + gc.getClass().getSimpleName(), description, "accgc", record); + this.gc = gc; + + MetricsRegistry registry = super.getRegistry(); + + gcStarted = registry.newGauge(GC_METRIC_PREFIX + "Started", + "Timestamp GC file collection cycle started", 0L); + gcFinished = registry.newGauge(GC_METRIC_PREFIX + "Finished", + "Timestamp GC file collect cycle finished", 0L); + gcCandidates = registry.newGauge(GC_METRIC_PREFIX + "Candidates", + "Number of files that are candidates for deletion", 0L); + gcInUse = + registry.newGauge(GC_METRIC_PREFIX + "InUse", "Number of candidate files still in use", 0L); + gcDeleted = + registry.newGauge(GC_METRIC_PREFIX + "Deleted", "Number of candidate files deleted", 0L); + gcErrors = + registry.newGauge(GC_METRIC_PREFIX + "Errors", "Number of candidate deletion errors", 0L); + + walStarted = registry.newGauge(GC_METRIC_PREFIX + "WalStarted", + "Timestamp GC WAL collection started", 0L); + walFinished = registry.newGauge(GC_METRIC_PREFIX + "WalFinished", + "Timestamp GC WAL collection finished", 0L); + walCandidates = registry.newGauge(GC_METRIC_PREFIX + "WalCandidates", + "Number of files that are candidates for deletion", 0L); + walInUse = registry.newGauge(GC_METRIC_PREFIX + "WalInUse", + "Number of wal file candidates that are still in use", 0L); + walDeleted = registry.newGauge(GC_METRIC_PREFIX + "WalDeleted", + "Number of candidate wal files deleted", 0L); + walErrors = registry.newGauge(GC_METRIC_PREFIX + "WalErrors", + "Number candidate wal file deletion errors", 0L); + + postOpDuration = registry.newGauge(GC_METRIC_PREFIX + "PostOpDuration", + "GC metadata table post operation duration in milliseconds", 0L); + + runCycleCount = registry.newGauge(GC_METRIC_PREFIX + "RunCycleCount", + "gauge incremented each gc cycle run, rest on process start", 0L); + + } + + @Override + protected void prepareMetrics() { + + GcCycleMetrics values = gc.getGcCycleMetrics(); + + GcCycleStats lastFileCollect = values.getLastCollect(); + + gcStarted.set(lastFileCollect.getStarted()); + gcFinished.set(lastFileCollect.getFinished()); + gcCandidates.set(lastFileCollect.getCandidates()); + gcInUse.set(lastFileCollect.getInUse()); + gcDeleted.set(lastFileCollect.getDeleted()); + gcErrors.set(lastFileCollect.getErrors()); + + GcCycleStats lastWalCollect = values.getLastWalCollect(); + + walStarted.set(lastWalCollect.getStarted()); + walFinished.set(lastWalCollect.getFinished()); + walCandidates.set(lastWalCollect.getCandidates()); + walInUse.set(lastWalCollect.getInUse()); + walDeleted.set(lastWalCollect.getDeleted()); + walErrors.set(lastWalCollect.getErrors()); + + postOpDuration.set(TimeUnit.NANOSECONDS.toMillis(values.getPostOpDurationNanos())); + runCycleCount.set(values.getRunCycleCount()); + } +} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java new file mode 100644 index 0000000..15a8ad3 --- /dev/null +++ b/server/gc/src/main/java/org/apache/accumulo/gc/metrics/GcMetricsFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.gc.metrics; + +import static java.util.Objects.requireNonNull; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GcMetricsFactory { + + private final static Logger log = LoggerFactory.getLogger(GcMetricsFactory.class); + + private boolean enableMetrics; + + public GcMetricsFactory(AccumuloConfiguration conf) { + requireNonNull(conf, "AccumuloConfiguration must not be null"); + enableMetrics = conf.getBoolean(Property.GC_METRICS_ENABLED); + } + + public boolean register(SimpleGarbageCollector gc) { + + if (!enableMetrics) { + log.info("Accumulo gc metrics are disabled. To enable, set {} in configuration", + Property.GC_METRICS_ENABLED); + return false; + } + + try { + + MetricsSystem metricsSystem = gc.getMetricsSystem(); + + new GcMetrics(gc).register(metricsSystem); + + return true; + + } catch (Exception ex) { + log.error("Failed to register accumulo gc metrics", ex); + return false; + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java new file mode 100644 index 0000000..eab1956 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/GcMetricsIT.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.gc.metrics.GcMetrics; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.metrics.MetricsFileTailer; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Functional test that uses a hadoop metrics 2 file sink to read published metrics for + * verification. + */ +public class GcMetricsIT extends AccumuloClusterHarness { + + private static final Logger log = LoggerFactory.getLogger(GcMetricsIT.class); + + private AccumuloClient accumuloClient; + + private static final int NUM_TAIL_ATTEMPTS = 20; + private static final long TAIL_DELAY = 5_000; + + private static final String[] EXPECTED_METRIC_KEYS = new String[] {"AccGcCandidates", + "AccGcDeleted", "AccGcErrors", "AccGcFinished", "AccGcInUse", "AccGcPostOpDuration", + "AccGcRunCycleCount", "AccGcStarted", "AccGcWalCandidates", "AccGcWalDeleted", + "AccGcWalErrors", "AccGcWalFinished", "AccGcWalInUse", "AccGcWalStarted"}; + + @Before + public void setup() { + accumuloClient = Accumulo.newClient().from(getClientProps()).build(); + } + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + @Test + public void gcMetricsPublished() { + + log.trace("Client started, properties:{}", accumuloClient.properties()); + + MetricsFileTailer gcTail = new MetricsFileTailer("accumulo.sink.file-gc"); + Thread t1 = new Thread(gcTail); + t1.start(); + + try { + + long testStart = System.currentTimeMillis(); + + LineUpdate firstUpdate = waitForUpdate(-1, gcTail); + + Map firstSeenMap = parseLine(firstUpdate.getLine()); + + log.trace("L:{}", firstUpdate.getLine()); + log.trace("M:{}", firstSeenMap); + + assertTrue(lookForExpectedKeys(firstSeenMap)); + sanity(testStart, firstSeenMap); + + LineUpdate nextUpdate = waitForUpdate(firstUpdate.getLastUpdate(), gcTail); + + Map updateSeenMap = parseLine(nextUpdate.getLine()); + + log.debug("Line received:{}", nextUpdate.getLine()); + log.trace("Mapped values:{}", updateSeenMap); + + assertTrue(lookForExpectedKeys(updateSeenMap)); + sanity(testStart, updateSeenMap); + + validate(firstSeenMap, updateSeenMap); + + } catch (Exception ex) { + log.debug("reads", ex); + } + } + + /** + * Validate metrics for consistency withing a run cycle. + * + * @param values + * map of values from one run cycle. + */ + private void sanity(final long testStart, final Map values) { + + long start = values.get("AccGcStarted"); + long finished = values.get("AccGcFinished"); + assertTrue(start >= testStart); + assertTrue(finished >= start); + + start = values.get("AccGcWalStarted"); + finished = values.get("AccGcWalFinished"); + assertTrue(start >= testStart); + assertTrue(finished >= start); + + } + + /** + * A series of sanity checks for the metrics between different update cycles, some values should + * be at least different, and some of the checks can include ordering. + * + * @param firstSeen + * map of first metric update + * @param nextSeen + * map of a later metric update. + */ + private void validate(Map firstSeen, Map nextSeen) { + assertTrue(nextSeen.get("AccGcStarted") > firstSeen.get("AccGcStarted")); + assertTrue(nextSeen.get("AccGcFinished") > firstSeen.get("AccGcWalStarted")); + assertTrue(nextSeen.get("AccGcRunCycleCount") > firstSeen.get("AccGcRunCycleCount")); + } + + /** + * The hadoop metrics file sink published records as a line with comma separated key=value pairs. + * This method parses the line and extracts the key, value pair from metrics that start with AccGc + * and returns them in a sort map. + * + * @param line + * a line from the metrics system file sink. + * @return a map of the metrics that start with AccGc + */ + private Map parseLine(final String line) { + + if (line == null) { + return Collections.emptyMap(); + } + + Map m = new TreeMap<>(); + + String[] csvTokens = line.split(","); + + for (String token : csvTokens) { + token = token.trim(); + if (token.startsWith(GcMetrics.GC_METRIC_PREFIX)) { + String[] parts = token.split("="); + m.put(parts[0], Long.parseLong(parts[1])); + } + } + return m; + } + + private static class LineUpdate { + private final long lastUpdate; + private final String line; + + LineUpdate(long lastUpdate, String line) { + this.lastUpdate = lastUpdate; + this.line = line; + } + + long getLastUpdate() { + return lastUpdate; + } + + String getLine() { + return line; + } + } + + private LineUpdate waitForUpdate(final long prevUpdate, final MetricsFileTailer tail) { + + for (int count = 0; count < NUM_TAIL_ATTEMPTS; count++) { + + String line = tail.getLast(); + long currUpdate = tail.getLastUpdate(); + + if (line != null && (currUpdate != prevUpdate)) { + return new LineUpdate(tail.getLastUpdate(), line); + } + + try { + Thread.sleep(TAIL_DELAY); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(ex); + } + } + // not found - throw exception. + throw new IllegalStateException( + String.format("File source update not received after %d tries in %d sec", NUM_TAIL_ATTEMPTS, + TimeUnit.MILLISECONDS.toSeconds(TAIL_DELAY * NUM_TAIL_ATTEMPTS))); + } + + private boolean lookForExpectedKeys(final Map received) { + + for (String e : EXPECTED_METRIC_KEYS) { + if (!received.containsKey(e)) { + return false; + } + } + + return true; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java new file mode 100644 index 0000000..7236e06 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsFileTailer.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.metrics; + +import java.io.File; +import java.io.RandomAccessFile; +import java.net.URL; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.FileBasedConfiguration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; +import org.apache.commons.configuration2.builder.fluent.Parameters; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class allows testing of the publishing to the hadoop metrics system by processing a file for + * metric records (written as a line.) The file should be configured using the hadoop metrics + * properties as a file based sink with the prefix that is provided on instantiation of the + * instance. + * + * This class will simulate tail-ing a file and is intended to be run in a separate thread. When the + * underlying file has data written, the vaule returned by getLastUpdate will change, and the last + * line can be retrieved with getLast(). + */ +public class MetricsFileTailer implements Runnable, AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(MetricsFileTailer.class); + + private static final int BUFFER_SIZE = 4; + + private final String metricsPrefix; + + private Lock lock = new ReentrantLock(); + private AtomicBoolean running = new AtomicBoolean(Boolean.TRUE); + + private AtomicLong lastUpdate = new AtomicLong(0); + private long startTime = System.nanoTime(); + + private int lineCounter = 0; + private String[] lineBuffer = new String[BUFFER_SIZE]; + + private final String metricsFilename; + + /** + * Create an instance that will tail a metrics file. The filename / path is determined by the + * hadoop-metrics-accumulo.properties sink configuration for the metrics prefix that is provided. + * + * @param metricsPrefix + * the prefix in the metrics configuration. + */ + public MetricsFileTailer(final String metricsPrefix) { + + this.metricsPrefix = metricsPrefix; + + Configuration sub = loadMetricsConfig(); + + // dump received configuration keys received. + if (log.isTraceEnabled()) { + Iterator keys = sub.getKeys(); + while (keys.hasNext()) { + log.trace("configuration key:{}", keys.next()); + } + } + + if (sub.containsKey("filename")) { + metricsFilename = sub.getString("filename"); + } else { + metricsFilename = ""; + } + + } + + /** + * Create an instance by specifying a file directly instead of using the metrics configuration - + * mainly for testing. + * + * @param metricsPrefix + * generally can be ignored. + * @param filename + * the path / file to be monitored. + */ + MetricsFileTailer(final String metricsPrefix, final String filename) { + this.metricsPrefix = metricsPrefix; + metricsFilename = filename; + } + + /** + * Look for the accumulo metrics configuration file on the classpath and return the subset for the + * http sink. + * + * @return a configuration with http sink properties. + */ + private Configuration loadMetricsConfig() { + try { + + final URL propUrl = + getClass().getClassLoader().getResource(MetricsTestSinkProperties.METRICS_PROP_FILENAME); + + if (propUrl == null) { + throw new IllegalStateException( + "Could not find " + MetricsTestSinkProperties.METRICS_PROP_FILENAME + " on classpath"); + } + + String filename = propUrl.getFile(); + + Parameters params = new Parameters(); + // Read data from this file + File propertiesFile = new File(filename); + + FileBasedConfigurationBuilder builder = + new FileBasedConfigurationBuilder(PropertiesConfiguration.class) + .configure(params.fileBased().setFile(propertiesFile)); + + Configuration config = builder.getConfiguration(); + + final Configuration sub = config.subset(metricsPrefix); + + if (log.isTraceEnabled()) { + log.trace("Config {}", config); + Iterator iterator = sub.getKeys(); + while (iterator.hasNext()) { + String key = iterator.next(); + log.trace("'{}\'=\'{}\'", key, sub.getProperty(key)); + } + } + + return sub; + + } catch (ConfigurationException ex) { + throw new IllegalStateException( + String.format("Could not find configuration file \'%s\' on classpath", + MetricsTestSinkProperties.METRICS_PROP_FILENAME)); + } + } + + /** + * Creates a marker value that changes each time a new line is detected. Clients can use this to + * determine if a call to getLast() will return a new value. + * + * @return a marker value set when a line is available. + */ + public long getLastUpdate() { + return lastUpdate.get(); + } + + /** + * Get the last line seen in the file. + * + * @return the last line from the file. + */ + public String getLast() { + lock.lock(); + try { + + int last = (lineCounter % BUFFER_SIZE) - 1; + if (last < 0) { + last = BUFFER_SIZE - 1; + } + return lineBuffer[last]; + } finally { + lock.unlock(); + } + } + + /** + * A loop that polls for changes and when the file changes, put the last line in a buffer that can + * be retrieved by clients using getLast(). + */ + @Override + public void run() { + + long filePos = 0; + + File f = new File(metricsFilename); + + while (running.get()) { + + try { + Thread.sleep(5_000); + } catch (InterruptedException ex) { + running.set(Boolean.FALSE); + Thread.currentThread().interrupt(); + return; + } + + long len = f.length(); + + try { + + // file truncated? reset position + if (len < filePos) { + filePos = 0; + lock.lock(); + try { + for (int i = 0; i < BUFFER_SIZE; i++) { + lineBuffer[i] = ""; + } + lineCounter = 0; + } finally { + lock.unlock(); + } + } + + if (len > filePos) { + // File must have had something added to it! + RandomAccessFile raf = new RandomAccessFile(f, "r"); + raf.seek(filePos); + String line; + lock.lock(); + try { + while ((line = raf.readLine()) != null) { + lineBuffer[lineCounter++ % BUFFER_SIZE] = line; + } + + lastUpdate.set(System.nanoTime() - startTime); + + } finally { + lock.unlock(); + } + filePos = raf.getFilePointer(); + raf.close(); + } + } catch (Exception ex) { + log.info("Error processing metrics file {}", metricsFilename, ex); + } + } + } + + @Override + public void close() { + running.set(Boolean.FALSE); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java new file mode 100644 index 0000000..5317755 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsTestSinkProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.metrics; + +/** + * common properties used with metrics configuration. + */ +public class MetricsTestSinkProperties { + + public static final String METRICS_PROP_FILENAME = "hadoop-metrics2-accumulo.properties"; + public static final String ACC_GC_SINK_PREFIX = "accumulo.sink.file-gc"; + public static final String ACC_MASTER_SINK_PREFIX = "accumulo.sink.file-master"; + +} diff --git a/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java b/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java new file mode 100644 index 0000000..59359a2 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/metrics/MetricsFileTailerTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.metrics; + +import static org.junit.Assert.assertTrue; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.FileSystems; +import java.nio.file.Files; + +import org.junit.AfterClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsFileTailerTest { + + private static final Logger log = LoggerFactory.getLogger(MetricsFileTailerTest.class); + + private static final String TEST_OUTFILE_NAME = "/tmp/testfile.txt"; + private static final String SUCCESS = "success"; + + @AfterClass + public static void cleanup() { + try { + Files.deleteIfExists(FileSystems.getDefault().getPath(TEST_OUTFILE_NAME)); + } catch (IOException ex) { + log.trace("Failed to clean-up test file " + TEST_OUTFILE_NAME, ex); + } + } + + /** + * Create a file tailer and then write some lines and validate the tailer returns the last line. + */ + @Test + public void fileUpdates() { + + MetricsFileTailer tailer = new MetricsFileTailer("foo", TEST_OUTFILE_NAME); + + Thread t = new Thread(tailer); + t.start(); + + long lastUpdate = tailer.getLastUpdate(); + + writeToFile(); + + boolean passed = Boolean.FALSE; + + int count = 0; + while (count++ < 5) { + if (lastUpdate != tailer.getLastUpdate()) { + lastUpdate = tailer.getLastUpdate(); + log.trace("{} - {}", tailer.getLastUpdate(), tailer.getLast()); + if (SUCCESS.compareTo(tailer.getLast()) == 0) { + passed = Boolean.TRUE; + break; + } + } else { + log.trace("no change"); + } + try { + Thread.sleep(5_000); + } catch (InterruptedException ex) { + // empty + } + } + + try { + tailer.close(); + } catch (Exception ex) { + log.trace("Failed to close file tailer on " + TEST_OUTFILE_NAME, ex); + } + assertTrue(passed); + } + + /** + * Simulate write record(s) to the file. + */ + private void writeToFile() { + try (FileWriter writer = new FileWriter(TEST_OUTFILE_NAME, true); + PrintWriter printWriter = new PrintWriter(writer)) { + printWriter.println("foo"); + // needs to be last line for test to pass + printWriter.println(SUCCESS); + printWriter.flush(); + } catch (IOException ex) { + throw new IllegalStateException("failed to write data to test file", ex); + } + } +} diff --git a/test/src/test/resources/hadoop-metrics2-accumulo.properties b/test/src/test/resources/hadoop-metrics2-accumulo.properties index e2eb761..e869144 100644 --- a/test/src/test/resources/hadoop-metrics2-accumulo.properties +++ b/test/src/test/resources/hadoop-metrics2-accumulo.properties @@ -31,13 +31,10 @@ accumulo.sink.file-all.class=org.apache.hadoop.metrics2.sink.FileSink accumulo.sink.file-all.filename=./target/it.all.metrics -accumulo.sink.test-sink.class=org.apache.accumulo.test.functional.util.Metrics2TestSink -accumulo.sink.test-sink.context=master -accumulo.sink.test-sink.filename=test.metrics -accumulo.sink.test-sink.period=7 - -# accumulo.sink.test-sink.context=* -# accumulo.sink.test-sink.period=5 +accumulo.sink.file-gc.class=org.apache.hadoop.metrics2.sink.FileSink +accumulo.sink.file-gc.context=accgc +accumulo.sink.file-gc.filename=./target/accgc.metrics +accumulo.sink.file-gc.period=5 # File sink for tserver metrics # accumulo.sink.file-tserver.class=org.apache.hadoop.metrics2.sink.FileSink