Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B30618373 for ; Fri, 26 Jun 2015 23:40:57 +0000 (UTC) Received: (qmail 60068 invoked by uid 500); 26 Jun 2015 23:40:55 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 59801 invoked by uid 500); 26 Jun 2015 23:40:55 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 59692 invoked by uid 99); 26 Jun 2015 23:40:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Jun 2015 23:40:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C7CE2DFE1C; Fri, 26 Jun 2015 23:40:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samarth@apache.org To: commits@phoenix.apache.org Date: Fri, 26 Jun 2015 23:40:55 -0000 Message-Id: <1e6accfc42cc42c6a13d212d27a417f4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] phoenix git commit: PHOENIX-1819 Build a framework to capture and report phoenix client side request level metrics http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index 5270277..bb4054b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -57,6 +57,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.job.JobManager; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -255,12 +256,9 @@ public class CsvBulkLoadTool extends Configured implements Tool { } List> runningJobs = new ArrayList>(); - boolean useInstrumentedPool = conn - .unwrap(PhoenixConnection.class) - .getQueryServices() - .getProps() - .getBoolean(QueryServices.METRICS_ENABLED, - QueryServicesOptions.DEFAULT_IS_METRICS_ENABLED); + boolean useInstrumentedPool = GlobalClientMetrics.isMetricsEnabled() + || conn.unwrap(PhoenixConnection.class).isRequestLevelMetricsEnabled(); + ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20, useInstrumentedPool); try{ http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index eb6dc3d..b500a25 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.mapreduce; +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; + import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -32,6 +34,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LookAheadResultIterator; import org.apache.phoenix.iterate.PeekingResultIterator; @@ -40,6 +43,7 @@ import org.apache.phoenix.iterate.RoundRobinResultIterator; import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.monitoring.ReadMetricQueue; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -100,8 +104,12 @@ public class PhoenixRecordReader extends RecordReader scans = pSplit.getScans(); try { List iterators = Lists.newArrayListWithExpectedSize(scans.size()); + StatementContext ctx = queryPlan.getContext(); + ReadMetricQueue readMetrics = ctx.getReadMetricsQueue(); + String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); for (Scan scan : scans) { - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(), scan); + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), + queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName)); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } @@ -112,7 +120,7 @@ public class PhoenixRecordReader extends RecordReader getMetrics() { + List metrics = new ArrayList<>(); + for (GlobalClientMetrics m : GlobalClientMetrics.values()) { + metrics.add(m.metric); + } + return metrics; + } + + public static boolean isMetricsEnabled() { + return isGlobalMetricsEnabled; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java new file mode 100644 index 0000000..f3b562f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetric.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +/** + * Class that exposes the various internal phoenix metrics collected + * at the JVM level. Because metrics are dynamic in nature, it is not guaranteed that the + * state exposed will always be in sync with each other. One should use + * these metrics primarily for monitoring and debugging purposes. + */ +public interface GlobalMetric extends Metric { + + /** + * @return Number of samples collected since the last {@link #reset()} call. + */ + public long getNumberOfSamples(); + + /** + * @return Sum of the values of the metric sampled since the last {@link #reset()} call. + */ + public long getTotalSum(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java new file mode 100644 index 0000000..26a16e1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.concurrent.atomic.AtomicLong; + +public class GlobalMetricImpl implements GlobalMetric { + + private AtomicLong numberOfSamples = new AtomicLong(0); + private Metric metric; + + public GlobalMetricImpl(MetricType type) { + this.metric = new AtomicMetric(type); + } + + /** + * Reset the internal state. Typically called after metric information has been collected and a new phase of + * collection is being requested for the next interval. + */ + @Override + public void reset() { + metric.reset(); + numberOfSamples.set(0); + } + + @Override + public long getNumberOfSamples() { + return numberOfSamples.get(); + } + + @Override + public long getTotalSum() { + return metric.getValue(); + } + + @Override + public void change(long delta) { + metric.change(delta); + numberOfSamples.incrementAndGet(); + } + + @Override + public void increment() { + metric.increment(); + numberOfSamples.incrementAndGet(); + } + + @Override + public String getName() { + return metric.getName(); + } + + @Override + public String getDescription() { + return metric.getDescription(); + } + + @Override + public long getValue() { + return metric.getValue(); + } + + @Override + public String getCurrentMetricState() { + return metric.getCurrentMetricState() + ", Number of samples: " + numberOfSamples.get(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java new file mode 100644 index 0000000..0e82ce4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MemoryMetricsHolder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES; +import static org.apache.phoenix.monitoring.MetricType.MEMORY_WAIT_TIME; + +/** + * Class that encapsulates the metrics regarding memory resources needed for servicing a request. + */ +public class MemoryMetricsHolder { + private final CombinableMetric memoryChunkSizeMetric; + private final CombinableMetric memoryWaitTimeMetric; + public static final MemoryMetricsHolder NO_OP_INSTANCE = new MemoryMetricsHolder(new ReadMetricQueue(false), null); + + public MemoryMetricsHolder(ReadMetricQueue readMetrics, String tableName) { + this.memoryChunkSizeMetric = readMetrics.allotMetric(MEMORY_CHUNK_BYTES, tableName); + this.memoryWaitTimeMetric = readMetrics.allotMetric(MEMORY_WAIT_TIME, tableName); + } + + public CombinableMetric getMemoryChunkSizeMetric() { + return memoryChunkSizeMetric; + } + + public CombinableMetric getMemoryWaitTimeMetric() { + return memoryWaitTimeMetric; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java index aef792c..1ad1c7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/Metric.java @@ -18,47 +18,46 @@ package org.apache.phoenix.monitoring; /** - * Interface that exposes the various internal phoenix metrics collected. - * Because metrics are dynamic in nature, it is not guaranteed that the - * state exposed will always be in sync with each other. One should use - * these metrics primarily for monitoring and debugging purposes. + * Interface that represents phoenix-internal metric. */ public interface Metric { - /** - * * @return Name of the metric */ public String getName(); - + /** - * * @return Description of the metric */ public String getDescription(); - + /** - * Reset the internal state. Typically called after - * metric information has been collected and a new - * phase of collection is being requested for the next - * interval. + * @return Current value of the metric */ - public void reset(); - + public long getValue(); + /** + * Change the metric by the specified amount * - * @return String that represents the current state of the metric. - * Typically used to log the current state. + * @param delta + * amount by which the metric value should be changed */ - public String getCurrentMetricState(); - + public void change(long delta); + + /** + * Change the value of metric by 1 + */ + public void increment(); + /** - * @return Number of samples collected since the last {@link #reset()} call. + * @return String that represents the current state of the metric. Typically used for logging or reporting purposes. */ - public long getNumberOfSamples(); + public String getCurrentMetricState(); /** - * @return Sum of the values of the metric sampled since the last {@link #reset()} call. + * Reset the metric */ - public long getTotalSum(); + public void reset(); + } + http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java new file mode 100644 index 0000000..a0c2a4a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +public enum MetricType { + + MUTATION_BATCH_SIZE("Batch sizes of mutations"), + MUTATION_BYTES("Size of mutations in bytes"), + MUTATION_COMMIT_TIME("Time it took to commit mutations"), + QUERY_TIME("Query times"), + NUM_PARALLEL_SCANS("Number of scans that were executed in parallel"), + SCAN_BYTES("Number of bytes read by scans"), + MEMORY_CHUNK_BYTES("Number of bytes allocated by the memory manager"), + MEMORY_WAIT_TIME("Number of milliseconds threads needed to wait for memory to be allocated through memory manager"), + MUTATION_SQL_COUNTER("Counter for number of mutation sql statements"), + SELECT_SQL_COUNTER("Counter for number of sql queries"), + TASK_QUEUE_WAIT_TIME("Time in milliseconds tasks had to wait in the queue of the thread pool executor"), + TASK_END_TO_END_TIME("Time in milliseconds spent by tasks from creation to completion"), + TASK_EXECUTION_TIME("Time in milliseconds tasks took to execute"), + TASK_EXECUTED_COUNTER("Counter for number of tasks submitted to the thread pool executor"), + TASK_REJECTED_COUNTER("Counter for number of tasks that were rejected by the thread pool executor"), + QUERY_TIMEOUT_COUNTER("Number of times query timed out"), + QUERY_FAILED_COUNTER("Number of times query failed"), + SPOOL_FILE_SIZE("Size of spool files created in bytes"), + SPOOL_FILE_COUNTER("Number of spool files created"), + CACHE_REFRESH_SPLITS_COUNTER("Number of times cache was refreshed because of splits"), + WALL_CLOCK_TIME_MS("Wall clock time elapsed for the overall query execution"), + RESULT_SET_TIME_MS("Wall clock time elapsed for reading all records using resultSet.next()"); + + private final String description; + + private MetricType(String description) { + this.description = description; + } + + public String description() { + return description; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java new file mode 100644 index 0000000..bffb9ad --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricsStopWatch.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import com.google.common.base.Stopwatch; + +/** + * + * Stop watch that is cognizant of the fact whether or not metrics is enabled. + * If metrics isn't enabled it doesn't do anything. Otherwise, it delegates + * calls to a {@code Stopwatch}. + * + */ +final class MetricsStopWatch { + + private final boolean isMetricsEnabled; + private final Stopwatch stopwatch; + + MetricsStopWatch(boolean isMetricsEnabled) { + this.isMetricsEnabled = isMetricsEnabled; + this.stopwatch = new Stopwatch(); + } + + void start() { + if (isMetricsEnabled) { + stopwatch.start(); + } + } + + void stop() { + if (isMetricsEnabled) { + if (stopwatch.isRunning()) { + stopwatch.stop(); + } + } + } + + long getElapsedTimeInMs() { + if (isMetricsEnabled) { + return stopwatch.elapsedMillis(); + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java new file mode 100644 index 0000000..e90da46 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE; +import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES; +import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Queue that tracks various writes/mutations related phoenix request metrics. + */ +public class MutationMetricQueue { + + // Map of table name -> mutation metric + private Map tableMutationMetric = new HashMap<>(); + + public void addMetricsForTable(String tableName, MutationMetric metric) { + MutationMetric tableMetric = tableMutationMetric.get(tableName); + if (tableMetric == null) { + tableMutationMetric.put(tableName, metric); + } else { + tableMetric.combineMetric(metric); + } + } + + public void combineMetricQueues(MutationMetricQueue other) { + Map tableMetricMap = other.tableMutationMetric; + for (Entry entry : tableMetricMap.entrySet()) { + addMetricsForTable(entry.getKey(), entry.getValue()); + } + } + + /** + * Publish the metrics to wherever you want them published. The internal state is cleared out after every publish. + * @return map of table name -> list of pair of (metric name, metric value) + */ + public Map> aggregate() { + Map> publishedMetrics = new HashMap<>(); + for (Entry entry : tableMutationMetric.entrySet()) { + String tableName = entry.getKey(); + MutationMetric metric = entry.getValue(); + Map publishedMetricsForTable = publishedMetrics.get(tableName); + if (publishedMetricsForTable == null) { + publishedMetricsForTable = new HashMap<>(); + publishedMetrics.put(tableName, publishedMetricsForTable); + } + publishedMetricsForTable.put(metric.getNumMutations().getName(), metric.getNumMutations().getValue()); + publishedMetricsForTable.put(metric.getMutationsSizeBytes().getName(), metric.getMutationsSizeBytes().getValue()); + publishedMetricsForTable.put(metric.getCommitTimeForMutations().getName(), metric.getCommitTimeForMutations().getValue()); + } + return publishedMetrics; + } + + public void clearMetrics() { + tableMutationMetric.clear(); // help gc + } + + /** + * Class that holds together the various metrics associated with mutations. + */ + public static class MutationMetric { + private final CombinableMetric numMutations = new CombinableMetricImpl(MUTATION_BATCH_SIZE); + private final CombinableMetric mutationsSizeBytes = new CombinableMetricImpl(MUTATION_BYTES); + private final CombinableMetric totalCommitTimeForMutations = new CombinableMetricImpl(MUTATION_COMMIT_TIME); + + public MutationMetric(long numMutations, long mutationsSizeBytes, long commitTimeForMutations) { + this.numMutations.change(numMutations); + this.mutationsSizeBytes.change(mutationsSizeBytes); + this.totalCommitTimeForMutations.change(commitTimeForMutations); + } + + public CombinableMetric getCommitTimeForMutations() { + return totalCommitTimeForMutations; + } + + public CombinableMetric getNumMutations() { + return numMutations; + } + + public CombinableMetric getMutationsSizeBytes() { + return mutationsSizeBytes; + } + + public void combineMetric(MutationMetric other) { + this.numMutations.combine(other.numMutations); + this.mutationsSizeBytes.combine(other.mutationsSizeBytes); + this.totalCommitTimeForMutations.combine(other.totalCommitTimeForMutations); + } + + } + + /** + * Class to represent a no-op mutation metric. Used in places where request level metric tracking for mutations is not + * needed or desired. + */ + public static class NoOpMutationMetricsQueue extends MutationMetricQueue { + + public static final NoOpMutationMetricsQueue NO_OP_MUTATION_METRICS_QUEUE = new NoOpMutationMetricsQueue(); + + private NoOpMutationMetricsQueue() {} + + @Override + public void addMetricsForTable(String tableName, MutationMetric metric) {} + + @Override + public Map> aggregate() { return Collections.emptyMap(); } + + + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java new file mode 100644 index 0000000..2d92116 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NonAtomicMetric.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +/** + * Version of {@link Metric} that can be used when the metric isn't getting concurrently modified/accessed by multiple + * threads and the memory consistency effects of happen-before can be established. For example - phoenix client side + * metrics are modified/accessed by only one thread at a time. Further, the actions of threads in the phoenix client + * thread pool happen-before the actions of the thread that performs the aggregation of metrics. This makes + * {@link NonAtomicMetric} a good fit for storing Phoenix's client side request level metrics. + */ +class NonAtomicMetric implements Metric { + + private final MetricType type; + private long value; + + public NonAtomicMetric(MetricType type) { + this.type = type; + } + + @Override + public String getName() { + return type.name(); + } + + @Override + public String getDescription() { + return type.description(); + } + + @Override + public long getValue() { + return value; + } + + @Override + public void change(long delta) { + value += delta; + } + + @Override + public void increment() { + value++; + } + + @Override + public String getCurrentMetricState() { + return getName() + ": " + value; + } + + @Override + public void reset() { + value = 0; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java new file mode 100644 index 0000000..1f71542 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/OverAllQueryMetrics.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static org.apache.phoenix.monitoring.MetricType.CACHE_REFRESH_SPLITS_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.RESULT_SET_TIME_MS; +import static org.apache.phoenix.monitoring.MetricType.WALL_CLOCK_TIME_MS; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; + +/** + * Class that represents the overall metrics associated with a query being executed by the phoenix. + */ +public class OverAllQueryMetrics { + private final MetricsStopWatch queryWatch; + private final MetricsStopWatch resultSetWatch; + private final CombinableMetric numParallelScans; + private final CombinableMetric wallClockTimeMS; + private final CombinableMetric resultSetTimeMS; + private final CombinableMetric queryTimedOut; + private final CombinableMetric queryFailed; + private final CombinableMetric cacheRefreshedDueToSplits; + + public OverAllQueryMetrics(boolean isMetricsEnabled) { + queryWatch = new MetricsStopWatch(isMetricsEnabled); + resultSetWatch = new MetricsStopWatch(isMetricsEnabled); + numParallelScans = isMetricsEnabled ? new CombinableMetricImpl(NUM_PARALLEL_SCANS) : NoOpRequestMetric.INSTANCE; + wallClockTimeMS = isMetricsEnabled ? new CombinableMetricImpl(WALL_CLOCK_TIME_MS) : NoOpRequestMetric.INSTANCE; + resultSetTimeMS = isMetricsEnabled ? new CombinableMetricImpl(RESULT_SET_TIME_MS) : NoOpRequestMetric.INSTANCE; + queryTimedOut = isMetricsEnabled ? new CombinableMetricImpl(QUERY_TIMEOUT_COUNTER) : NoOpRequestMetric.INSTANCE; + queryFailed = isMetricsEnabled ? new CombinableMetricImpl(QUERY_FAILED_COUNTER) : NoOpRequestMetric.INSTANCE; + cacheRefreshedDueToSplits = isMetricsEnabled ? new CombinableMetricImpl(CACHE_REFRESH_SPLITS_COUNTER) + : NoOpRequestMetric.INSTANCE; + } + + public void updateNumParallelScans(long numParallelScans) { + this.numParallelScans.change(numParallelScans); + } + + public void queryTimedOut() { + queryTimedOut.increment(); + } + + public void queryFailed() { + queryFailed.increment(); + } + + public void cacheRefreshedDueToSplits() { + cacheRefreshedDueToSplits.increment(); + } + + public void startQuery() { + queryWatch.start(); + } + + public void endQuery() { + queryWatch.stop(); + wallClockTimeMS.change(queryWatch.getElapsedTimeInMs()); + } + + public void startResultSetWatch() { + resultSetWatch.start(); + } + + public void stopResultSetWatch() { + resultSetWatch.stop(); + resultSetTimeMS.change(resultSetWatch.getElapsedTimeInMs()); + } + + public Map publish() { + Map metricsForPublish = new HashMap<>(); + metricsForPublish.put(numParallelScans.getName(), numParallelScans.getValue()); + metricsForPublish.put(wallClockTimeMS.getName(), wallClockTimeMS.getValue()); + metricsForPublish.put(resultSetTimeMS.getName(), resultSetTimeMS.getValue()); + metricsForPublish.put(queryTimedOut.getName(), queryTimedOut.getValue()); + metricsForPublish.put(queryFailed.getName(), queryFailed.getValue()); + metricsForPublish.put(cacheRefreshedDueToSplits.getName(), cacheRefreshedDueToSplits.getValue()); + return metricsForPublish; + } + + public void reset() { + numParallelScans.reset(); + wallClockTimeMS.reset(); + resultSetTimeMS.reset(); + queryTimedOut.reset(); + queryFailed.reset(); + cacheRefreshedDueToSplits.reset(); + queryWatch.stop(); + resultSetWatch.stop(); + } + + public OverAllQueryMetrics combine(OverAllQueryMetrics metric) { + cacheRefreshedDueToSplits.combine(metric.cacheRefreshedDueToSplits); + queryFailed.combine(metric.queryFailed); + queryTimedOut.combine(metric.queryTimedOut); + numParallelScans.combine(metric.numParallelScans); + return this; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java deleted file mode 100644 index 28e2f2e..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/PhoenixMetrics.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.monitoring; - -/** - * Central place where we keep track of all the internal - * phoenix metrics that we track. - * - */ -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.phoenix.query.QueryServicesOptions; - -public class PhoenixMetrics { - private static final boolean isMetricsEnabled = QueryServicesOptions.withDefaults().isMetricsEnabled(); - - public static boolean isMetricsEnabled() { - return isMetricsEnabled; - } - - public enum SizeMetric { - MUTATION_BATCH_SIZE("CumulativeBatchSizesOfMutations", "Cumulative batch sizes of mutations"), - MUTATION_BYTES("CumulativeMutationSize", "Cumulative size of mutations in bytes"), - MUTATION_COMMIT_TIME("CumulativeMutationTime", "Cumulative time it took to send mutations"), - QUERY_TIME("QueryTime", "Cumulative query times"), - PARALLEL_SCANS("CumulativeNumberOfParallelScans", "Cumulative number of scans executed that were executed in parallel"), - SCAN_BYTES("CumulativeScanBytesSize", "Cumulative number of bytes read by scans"), - SPOOL_FILE_SIZE("CumulativeSpoolFilesSize", "Cumulative size of spool files created in bytes"), - MEMORY_MANAGER_BYTES("CumulativeBytesAllocated", "Cumulative number of bytes allocated by the memory manager"), - MEMORY_WAIT_TIME("CumulativeMemoryWaitTime", "Cumulative number of milliseconds threads needed to wait for memory to be allocated through memory manager"), - TASK_QUEUE_WAIT_TIME("CumulativeTaskQueueWaitTime", "Cumulative time in milliseconds tasks had to wait in the queue of the thread pool executor"), - TASK_END_TO_END_TIME("CumulativeTaskEndToEndTime", "Cumulative time in milliseconds spent by tasks from creation to completion"), - TASK_EXECUTION_TIME("CumulativeTaskExecutionTime", "Cumulative time in milliseconds tasks took to execute"); - - private final SizeStatistic metric; - - private SizeMetric(String metricName, String metricDescription) { - metric = new SizeStatistic(metricName, metricDescription); - } - - public void update(long value) { - if (isMetricsEnabled) { - metric.add(value); - } - } - - // exposed for testing. - public Metric getMetric() { - return metric; - } - - @Override - public String toString() { - return metric.toString(); - } - } - - public enum CountMetric { - MUTATION_COUNT("NumMutationCounter", "Counter for number of mutation statements"), - QUERY_COUNT("NumQueryCounter", "Counter for number of queries"), - TASK_COUNT("NumberOfTasksCounter", "Counter for number of tasks submitted to the thread pool executor"), - REJECTED_TASK_COUNT("RejectedTasksCounter", "Counter for number of tasks that were rejected by the thread pool executor"), - QUERY_TIMEOUT("QueryTimeoutCounter", "Number of times query timed out"), - FAILED_QUERY("QueryFailureCounter", "Number of times query failed"), - NUM_SPOOL_FILE("NumSpoolFilesCounter", "Number of spool files created"); - - private final Counter metric; - - private CountMetric(String metricName, String metricDescription) { - metric = new Counter(metricName, metricDescription); - } - - public void increment() { - if (isMetricsEnabled) { - metric.increment(); - } - } - - // exposed for testing. - public Metric getMetric() { - return metric; - } - - @Override - public String toString() { - return metric.toString(); - } - } - - public static Collection getMetrics() { - List metrics = new ArrayList<>(); - for (SizeMetric s : SizeMetric.values()) { - metrics.add(s.metric); - } - for (CountMetric s : CountMetric.values()) { - metrics.add(s.metric); - } - return metrics; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java new file mode 100644 index 0000000..e6c6be2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ReadMetricQueue.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.annotation.Nonnull; + +import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Queue of all metrics associated with performing reads from the cluster. + */ +public class ReadMetricQueue { + + private static final int MAX_QUEUE_SIZE = 20000; // TODO: should this be configurable? + + private final ConcurrentMap> metricsMap = new ConcurrentHashMap<>(); + + private final boolean isRequestMetricsEnabled; + + public ReadMetricQueue(boolean isRequestMetricsEnabled) { + this.isRequestMetricsEnabled = isRequestMetricsEnabled; + } + + public CombinableMetric allotMetric(MetricType type, String tableName) { + if (!isRequestMetricsEnabled) { return NoOpRequestMetric.INSTANCE; } + MetricKey key = new MetricKey(type, tableName); + Queue q = getMetricQueue(key); + CombinableMetric metric = getMetric(type); + q.offer(metric); + return metric; + } + + @VisibleForTesting + public CombinableMetric getMetric(MetricType type) { + CombinableMetric metric = new CombinableMetricImpl(type); + return metric; + } + + /** + * @return map of table name -> list of pair of (metric name, metric value) + */ + public Map> aggregate() { + Map> publishedMetrics = new HashMap<>(); + for (Entry> entry : metricsMap.entrySet()) { + String tableNameToPublish = entry.getKey().tableName; + Collection metrics = entry.getValue(); + if (metrics.size() > 0) { + CombinableMetric m = combine(metrics); + Map map = publishedMetrics.get(tableNameToPublish); + if (map == null) { + map = new HashMap<>(); + publishedMetrics.put(tableNameToPublish, map); + } + map.put(m.getName(), m.getValue()); + } + } + return publishedMetrics; + } + + public void clearMetrics() { + metricsMap.clear(); // help gc + } + + private static CombinableMetric combine(Collection metrics) { + int size = metrics.size(); + if (size == 0) { throw new IllegalArgumentException("Metrics collection needs to have at least one element"); } + Iterator itr = metrics.iterator(); + CombinableMetric combinedMetric = itr.next(); + while (itr.hasNext()) { + combinedMetric = combinedMetric.combine(itr.next()); + } + return combinedMetric; + } + + /** + * Combine the metrics. This method should only be called in a single threaded manner when the two metric holders + * are not getting modified. + */ + public ReadMetricQueue combineReadMetrics(ReadMetricQueue other) { + ConcurrentMap> otherMetricsMap = other.metricsMap; + for (Entry> entry : otherMetricsMap.entrySet()) { + MetricKey key = entry.getKey(); + Queue otherQueue = entry.getValue(); + CombinableMetric combinedMetric = null; + // combine the metrics corresponding to this metric key before putting it in the queue. + for (CombinableMetric m : otherQueue) { + if (combinedMetric == null) { + combinedMetric = m; + } else { + combinedMetric.combine(m); + } + } + if (combinedMetric != null) { + Queue thisQueue = getMetricQueue(key); + thisQueue.offer(combinedMetric); + } + } + return this; + } + + /** + * Inner class whose instances are used as keys in the metrics map. + */ + private static class MetricKey { + @Nonnull + private final MetricType type; + + @Nonnull + private final String tableName; + + MetricKey(MetricType type, String tableName) { + checkNotNull(type); + checkNotNull(tableName); + this.type = type; + this.tableName = tableName; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + tableName.hashCode(); + result = prime * result + type.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + MetricKey other = (MetricKey)obj; + if (tableName.equals(other.tableName) && type == other.type) return true; + return false; + } + + } + + private Queue getMetricQueue(MetricKey key) { + Queue q = metricsMap.get(key); + if (q == null) { + q = new LinkedBlockingQueue(MAX_QUEUE_SIZE); + Queue curQ = metricsMap.putIfAbsent(key, q); + if (curQ != null) { + q = curQ; + } + } + return q; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java deleted file mode 100644 index 9eca754..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SizeStatistic.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.monitoring; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * - * Statistic that keeps track of the sum of long values that - * could be used to represent a phoenix metric. For performance - * reasons the internal state in this metric is not strictly covariant - * and hence should only be used for monitoring and debugging purposes. - */ -class SizeStatistic implements Metric { - - private final AtomicLong total = new AtomicLong(0); - private final AtomicLong numSamples = new AtomicLong(0); - private final String name; - private final String description; - - public SizeStatistic(String name, String description) { - this.name = name; - this.description = description; - } - - @Override - public String getName() { - return name; - } - - @Override - public String getDescription() { - return description; - } - - @Override - public void reset() { - total.set(0); - numSamples.set(0); - } - - @Override - public String getCurrentMetricState() { - return "Name:" + description + ", Total: " + total.get() + ", Number of samples: " + numSamples.get(); - } - - @Override - public long getNumberOfSamples() { - return numSamples.get(); - } - - @Override - public long getTotalSum() { - return total.get(); - } - - public long add(long value) { - // there is a race condition here but what the heck. - numSamples.incrementAndGet(); - return total.addAndGet(value); - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java new file mode 100644 index 0000000..4373887 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/SpoolingMetricsHolder.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + + +/** + * Class that encapsulates the various metrics associated with the spooling done by phoenix as part of servicing a + * request. + */ +public class SpoolingMetricsHolder { + + private final CombinableMetric spoolFileSizeMetric; + private final CombinableMetric numSpoolFileMetric; + public static final SpoolingMetricsHolder NO_OP_INSTANCE = new SpoolingMetricsHolder(new ReadMetricQueue(false), ""); + + public SpoolingMetricsHolder(ReadMetricQueue readMetrics, String tableName) { + this.spoolFileSizeMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_SIZE, tableName); + this.numSpoolFileMetric = readMetrics.allotMetric(MetricType.SPOOL_FILE_COUNTER, tableName); + } + + public CombinableMetric getSpoolFileSizeMetric() { + return spoolFileSizeMetric; + } + + public CombinableMetric getNumSpoolFileMetric() { + return numSpoolFileMetric; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java new file mode 100644 index 0000000..98ff57c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/TaskExecutionMetricsHolder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; +import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; +import static org.apache.phoenix.monitoring.MetricType.TASK_QUEUE_WAIT_TIME; +import static org.apache.phoenix.monitoring.MetricType.TASK_REJECTED_COUNTER; + + +/** + * Class to encapsulate the various metrics associated with submitting and executing a task to the phoenix client + * thread pool. + */ +public class TaskExecutionMetricsHolder { + + private final CombinableMetric taskQueueWaitTime; + private final CombinableMetric taskEndToEndTime; + private final CombinableMetric taskExecutionTime; + private final CombinableMetric numTasks; + private final CombinableMetric numRejectedTasks; + public static final TaskExecutionMetricsHolder NO_OP_INSTANCE = new TaskExecutionMetricsHolder(new ReadMetricQueue(false), ""); + + public TaskExecutionMetricsHolder(ReadMetricQueue readMetrics, String tableName) { + taskQueueWaitTime = readMetrics.allotMetric(TASK_QUEUE_WAIT_TIME, tableName); + taskEndToEndTime = readMetrics.allotMetric(TASK_END_TO_END_TIME, tableName); + taskExecutionTime = readMetrics.allotMetric(TASK_EXECUTION_TIME, tableName); + numTasks = readMetrics.allotMetric(TASK_EXECUTED_COUNTER, tableName); + numRejectedTasks = readMetrics.allotMetric(TASK_REJECTED_COUNTER, tableName); + } + + public CombinableMetric getTaskQueueWaitTime() { + return taskQueueWaitTime; + } + + public CombinableMetric getTaskEndToEndTime() { + return taskEndToEndTime; + } + + public CombinableMetric getTaskExecutionTime() { + return taskExecutionTime; + } + + public CombinableMetric getNumTasks() { + return numTasks; + } + + public CombinableMetric getNumRejectedTasks() { + return numRejectedTasks; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java index 898a919..c16b86d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/BaseQueryServicesImpl.java @@ -45,7 +45,7 @@ public abstract class BaseQueryServicesImpl implements QueryServices { options.getKeepAliveMs(), options.getThreadPoolSize(), options.getQueueSize(), - options.isMetricsEnabled()); + options.isGlobalMetricsEnabled()); this.memoryManager = new GlobalMemoryManager( Runtime.getRuntime().maxMemory() * options.getMaxMemoryPerc() / 100, options.getMaxMemoryWaitMs()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 9183a70..62b080c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -159,7 +159,7 @@ public interface QueryServices extends SQLCloseable { public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay"; public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells"; public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls"; - public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled"; + public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled"; // rpc queue configs public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count"; @@ -167,6 +167,7 @@ public interface QueryServices extends SQLCloseable { public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder"; public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions"; + public static final String COLLECT_REQUEST_LEVEL_METRICS = "phoenix.query.request.metrics.enabled"; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 972bf26..3efd79f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -18,15 +18,16 @@ package org.apache.phoenix.query; import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE; -import static org.apache.phoenix.query.QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; +import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB; import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB; import static org.apache.phoenix.query.QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB; +import static org.apache.phoenix.query.QueryServices.GLOBAL_METRICS_ENABLED; import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; @@ -43,7 +44,6 @@ import static org.apache.phoenix.query.QueryServices.MAX_SERVER_CACHE_TIME_TO_LI import static org.apache.phoenix.query.QueryServices.MAX_SERVER_METADATA_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB; -import static org.apache.phoenix.query.QueryServices.METRICS_ENABLED; import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; @@ -188,7 +188,7 @@ public class QueryServicesOptions { // TODO Change this to true as part of PHOENIX-1543 public static final boolean DEFAULT_AUTO_COMMIT = false; - public static final boolean DEFAULT_IS_METRICS_ENABLED = true; + public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true; private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName(); @@ -197,6 +197,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_USE_BYTE_BASED_REGEX = false; public static final boolean DEFAULT_FORCE_ROW_KEY_ORDER = false; public static final boolean DEFAULT_ALLOW_USER_DEFINED_FUNCTIONS = false; + public static final boolean DEFAULT_REQUEST_LEVEL_METRICS_ENABLED = false; private final Configuration config; @@ -249,10 +250,11 @@ public class QueryServicesOptions { .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE) .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK) .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK) - .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED) + .setIfUnset(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED) .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY) .setIfUnset(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX) - .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER); + .setIfUnset(FORCE_ROW_KEY_ORDER_ATTRIB, DEFAULT_FORCE_ROW_KEY_ORDER) + .setIfUnset(COLLECT_REQUEST_LEVEL_METRICS, DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -448,10 +450,10 @@ public class QueryServicesOptions { return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES); } - public boolean isMetricsEnabled() { - return config.getBoolean(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED); + public boolean isGlobalMetricsEnabled() { + return config.getBoolean(GLOBAL_METRICS_ENABLED, DEFAULT_IS_GLOBAL_METRICS_ENABLED); } - + public boolean isUseByteBasedRegex() { return config.getBoolean(USE_BYTE_BASED_REGEX_ATTRIB, DEFAULT_USE_BYTE_BASED_REGEX); } @@ -530,11 +532,7 @@ public class QueryServicesOptions { return this; } - public QueryServicesOptions setMetricsEnabled(boolean flag) { - config.setBoolean(METRICS_ENABLED, flag); - return this; - } - + public QueryServicesOptions setUseByteBasedRegex(boolean flag) { config.setBoolean(USE_BYTE_BASED_REGEX_ATTRIB, flag); return this; @@ -544,5 +542,4 @@ public class QueryServicesOptions { config.setBoolean(FORCE_ROW_KEY_ORDER_ATTRIB, forceRowKeyOrder); return this; } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java index 265fc78..159e0c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java @@ -17,12 +17,23 @@ */ package org.apache.phoenix.trace; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import org.apache.commons.configuration.Configuration; +import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION; +import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION; +import static org.apache.phoenix.metrics.MetricInfo.END; +import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; +import static org.apache.phoenix.metrics.MetricInfo.PARENT; +import static org.apache.phoenix.metrics.MetricInfo.SPAN; +import static org.apache.phoenix.metrics.MetricInfo.START; +import static org.apache.phoenix.metrics.MetricInfo.TAG; +import static org.apache.phoenix.metrics.MetricInfo.TRACE; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,20 +42,15 @@ import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.phoenix.metrics.*; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.QueryUtil; -import javax.annotation.Nullable; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.*; - -import static org.apache.phoenix.metrics.MetricInfo.*; -import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; /** * Write the metrics to a phoenix table. http://git-wip-us.apache.org/repos/asf/phoenix/blob/047b8ca6/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index ddd9753..aede947 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -161,7 +161,6 @@ public class JDBCUtil { } return Boolean.valueOf(autoCommit); } - /** * Retrieve the value of the optional consistency read setting from JDBC url or connection * properties. @@ -182,4 +181,9 @@ public class JDBCUtil { return Consistency.STRONG; } + + public static boolean isCollectingRequestLevelMetricsEnabled(String url, Properties overrideProps, ReadOnlyProps queryServicesProps) throws SQLException { + String batchSizeStr = findProperty(url, overrideProps, PhoenixRuntime.REQUEST_METRIC_ATTRIB); + return (batchSizeStr == null ? queryServicesProps.getBoolean(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, QueryServicesOptions.DEFAULT_REQUEST_LEVEL_METRICS_ENABLED) : Boolean.parseBoolean(batchSizeStr)); + } }