phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From karanmeht...@apache.org
Subject phoenix git commit: PHOENIX-4835 LoggingPhoenixConnection should log metrics upon connection close
Date Fri, 17 Aug 2018 19:22:57 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 bbc69da80 -> e2cd16bcd


PHOENIX-4835 LoggingPhoenixConnection should log metrics upon connection close


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2cd16bc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2cd16bc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2cd16bc

Branch: refs/heads/4.x-HBase-1.2
Commit: e2cd16bcdac97c4408fc4a5002991abf7bc056ac
Parents: bbc69da
Author: Karan Mehta <karanmehta93@gmail.com>
Authored: Thu Aug 16 15:08:12 2018 -0700
Committer: Karan Mehta <karanmehta93@gmail.com>
Committed: Fri Aug 17 12:10:10 2018 -0700

----------------------------------------------------------------------
 .../monitoring/BasePhoenixMetricsIT.java        | 128 +++++++++++++
 .../monitoring/PhoenixLoggingMetricsIT.java     | 181 +++++++++++++++++++
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 170 ++---------------
 .../phoenix/jdbc/LoggingPhoenixConnection.java  |  12 +-
 4 files changed, 332 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cd16bc/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
new file mode 100644
index 0000000..5c016f6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class BasePhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
+
+    static final int MAX_RETRIES = 5;
+
+    static final List<MetricType> mutationMetricsToSkip =
+    Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
+    static final List<MetricType> readMetricsToSkip =
+    Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME,
+            MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME,
+            MetricType.COUNT_MILLS_BETWEEN_NEXTS);
+    static final String CUSTOM_URL_STRING = "SESSION";
+    static final AtomicInteger numConnections = new AtomicInteger(0);
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Phoenix Global client metrics are enabled by default
+        // Enable request metric collection at the driver level
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
+        // disable renewing leases as this will force spooling to happen.
+        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        // need the non-test driver for some tests that check number of hconnections, etc.
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+
+    }
+
+    Connection insertRowsInTable(String tableName, long numRows) throws SQLException {
+        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+        Connection conn = DriverManager.getConnection(getUrl());
+        PreparedStatement stmt = conn.prepareStatement(dml);
+        for (int i = 1; i <= numRows; i++) {
+            stmt.setString(1, "key" + i);
+            stmt.setString(2, "value" + i);
+            stmt.executeUpdate();
+        }
+        conn.commit();
+        return conn;
+    }
+
+    void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
+                                                 Map<String, Map<MetricType, Long>>
readMetrics) {
+        assertTrue("No read metrics present when there should have been!", readMetrics.size()
> 0);
+        int numTables = 0;
+        for (Map.Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet())
{
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for read metrics", tableName, t);
+            numTables++;
+            Map<MetricType, Long> p = entry.getValue();
+            assertTrue("No read metrics present when there should have been", p.size() >
0);
+            for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
+                MetricType metricType = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricType.equals(TASK_EXECUTED_COUNTER)) {
+                    assertEquals(tableSaltBuckets, metricValue);
+                } else if (metricType.equals(SCAN_BYTES)) {
+                    assertTrue("Scan bytes read should be greater than zero", metricValue
> 0);
+                }
+            }
+        }
+        assertEquals("There should have been read metrics only for one table: " + tableName,
1, numTables);
+    }
+
+    void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType,
Long>> mutationMetrics) {
+        assertTrue("No mutation metrics present when there should have been", mutationMetrics.size()
> 0);
+        for (Map.Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet())
{
+            String t = entry.getKey();
+            assertEquals("Table name didn't match for mutation metrics", tableName, t);
+            Map<MetricType, Long> p = entry.getValue();
+            assertEquals("There should have been four metrics", 4, p.size());
+            for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
+                MetricType metricType = metric.getKey();
+                long metricValue = metric.getValue();
+                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
+                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
+                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) {
+                    assertTrue("Mutation commit time should be greater than zero", metricValue
> 0);
+                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
+                    assertTrue("Mutation bytes size should be greater than zero", metricValue
> 0);
+                } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
+                    assertEquals("Zero failed mutations expected", 0, metricValue);
+                }
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cd16bc/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
new file mode 100644
index 0000000..02640e7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixLoggingMetricsIT.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixMetricsLog;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class PhoenixLoggingMetricsIT extends BasePhoenixMetricsIT {
+
+    private static final int NUM_ROWS = 10;
+
+    private final Map<MetricType, Long> overAllQueryMetricsMap = Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> requestReadMetricsMap =
Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap
= Maps.newHashMap();
+    private final Map<String, Map<MetricType, Long>> mutationReadMetricsMap =
Maps.newHashMap();
+
+    private String tableName1;
+    private String tableName2;
+    private LoggingPhoenixConnection loggedConn;
+
+    @Before
+    public void beforeTest() throws Exception {
+        clearAllTestMetricMaps();
+        tableName1 = generateUniqueName();
+        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V
VARCHAR)";
+        Connection setupConn = DriverManager.getConnection(getUrl());
+        setupConn.createStatement().execute(ddl);
+        setupConn.close();
+        insertRowsInTable(tableName1, NUM_ROWS);
+
+        tableName2 = generateUniqueName();
+        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
+        setupConn = DriverManager.getConnection(getUrl());
+        setupConn.createStatement().execute(ddl);
+        setupConn.close();
+
+        Connection testConn = DriverManager.getConnection(getUrl());
+        loggedConn = getLoggingPhoenixConnection(testConn);
+    }
+
+    @Test
+    public void testPhoenixMetricsLoggedOnCommit() throws Exception {
+        // run SELECT to verify read metrics are logged
+        String query = "SELECT * FROM " + tableName1;
+        Statement stmt = loggedConn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        while (rs.next()) {
+        }
+        rs.close();
+        assertTrue("Read metrics for not found for " + tableName1,
+                requestReadMetricsMap.get(tableName1).size() > 0);
+        assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() >
0);
+
+        // run UPSERT SELECT to verify mutation metrics are logged
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+        loggedConn.createStatement().executeUpdate(upsertSelect);
+
+        // Assert that metrics are logged upon commit
+        loggedConn.commit();
+        assertTrue("Mutation write metrics for not found for " + tableName2,
+                mutationWriteMetricsMap.get(tableName2).size() > 0);
+        assertMutationMetrics(tableName2, NUM_ROWS, mutationWriteMetricsMap);
+        assertTrue("Mutation read metrics for not found for " + tableName1,
+                mutationReadMetricsMap.get(tableName1).size() > 0);
+        assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap);
+
+        clearAllTestMetricMaps();
+
+        // Assert that metrics logging happens only once
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+
+        clearAllTestMetricMaps();
+
+        // Assert that metrics logging happens only once again
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+    }
+
+    @Test
+    public void testPhoenixMetricsLoggedOnClose() throws Exception {
+        // run SELECT to verify read metrics are logged
+        String query = "SELECT * FROM " + tableName1;
+        Statement stmt = loggedConn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        while (rs.next()) {
+        }
+        rs.close();
+        assertTrue("Read metrics for not found for " + tableName1,
+                requestReadMetricsMap.get(tableName1).size() > 0);
+        assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() >
0);
+
+        // run UPSERT SELECT to verify mutation metrics are logged
+        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
+        loggedConn.createStatement().executeUpdate(upsertSelect);
+
+        // Autocommit is turned off by default
+        // Hence mutation metrics are not expected during connection close
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged for " + tableName2,
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics for not found for " + tableName1,
+                mutationReadMetricsMap.get(tableName1).size() > 0);
+        assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap);
+
+        clearAllTestMetricMaps();
+
+        loggedConn.close();
+        assertTrue("Mutation write metrics are not logged again.",
+                mutationWriteMetricsMap.size() == 0);
+        assertTrue("Mutation read metrics are not logged again.",
+                mutationReadMetricsMap.size() == 0);
+    }
+
+    void clearAllTestMetricMaps() {
+        overAllQueryMetricsMap.clear();
+        requestReadMetricsMap.clear();
+        mutationWriteMetricsMap.clear();
+        mutationReadMetricsMap.clear();
+    }
+
+    LoggingPhoenixConnection getLoggingPhoenixConnection(Connection conn) {
+        return new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() {
+            @Override
+            public void logOverAllReadRequestMetrics(
+                    Map<MetricType, Long> overAllQueryMetrics) {
+                overAllQueryMetricsMap.putAll(overAllQueryMetrics);
+            }
+
+            @Override
+            public void logRequestReadMetrics(
+                    Map<String, Map<MetricType, Long>> requestReadMetrics) {
+                requestReadMetricsMap.putAll(requestReadMetrics);
+            }
+
+            @Override
+            public void logWriteMetricsfoForMutations(
+                    Map<String, Map<MetricType, Long>> mutationWriteMetrics)
{
+                mutationWriteMetricsMap.putAll(mutationWriteMetrics);
+            }
+
+            @Override
+            public void logReadMetricInfoForMutationsSinceLastReset(
+                    Map<String, Map<MetricType, Long>> mutationReadMetrics) {
+                mutationReadMetricsMap.putAll(mutationReadMetrics);
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cd16bc/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 4c5c592..0882cec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -34,7 +34,6 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FIL
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
-import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
@@ -59,51 +58,34 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.LoggingPhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.jdbc.PhoenixMetricsLog;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
-
-    private static final List<MetricType> mutationMetricsToSkip =
-            Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
-    private static final List<MetricType> readMetricsToSkip =
-            Lists.newArrayList(MetricType.TASK_QUEUE_WAIT_TIME,
-                MetricType.TASK_EXECUTION_TIME, MetricType.TASK_END_TO_END_TIME,
-                MetricType.COUNT_MILLS_BETWEEN_NEXTS);
-    private static final String CUSTOM_URL_STRING = "SESSION";
-    private static final AtomicInteger numConnections = new AtomicInteger(0);
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-        // Enable request metric collection at the driver level
-        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true));
-        // disable renewing leases as this will force spooling to happen.
-        props.put(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-        // need the non-test driver for some tests that check number of hconnections, etc.
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-    }
+/**
+ * Tests that
+ * 1. Phoenix Global metrics are exposed via
+ *   a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks
+ * 2. Phoenix Request level metrics are exposed via
+ *   a. PhoenixRuntime
+ */
+public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class);
 
     @Test
     public void testResetGlobalPhoenixMetrics() {
@@ -740,19 +722,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
                 expectedTableNames.size() == 0);
     }
 
-    private Connection insertRowsInTable(String tableName, long numRows) throws SQLException
{
-        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
-        Connection conn = DriverManager.getConnection(getUrl());
-        PreparedStatement stmt = conn.prepareStatement(dml);
-        for (int i = 1; i <= numRows; i++) {
-            stmt.setString(1, "key" + i);
-            stmt.setString(2, "value" + i);
-            stmt.executeUpdate();
-        }
-        conn.commit();
-        return conn;
-    }
-
     // number of records read should be number of bytes at the end
     public static final long SCAN_BYTES_DELTA = 1;
 
@@ -800,52 +769,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
-            Map<String, Map<MetricType, Long>> readMetrics) {
-        assertTrue("No read metrics present when there should have been!", readMetrics.size()
> 0);
-        int numTables = 0;
-        for (Entry<String, Map<MetricType, Long>> entry : readMetrics.entrySet())
{
-            String t = entry.getKey();
-            assertEquals("Table name didn't match for read metrics", tableName, t);
-            numTables++;
-            Map<MetricType, Long> p = entry.getValue();
-            assertTrue("No read metrics present when there should have been", p.size() >
0);
-            for (Entry<MetricType, Long> metric : p.entrySet()) {
-            	MetricType metricType = metric.getKey();
-                long metricValue = metric.getValue();
-                if (metricType.equals(TASK_EXECUTED_COUNTER)) {
-                    assertEquals(tableSaltBuckets, metricValue);
-                } else if (metricType.equals(SCAN_BYTES)) {
-                    assertTrue("Scan bytes read should be greater than zero", metricValue
> 0);
-                }
-            }
-        }
-        assertEquals("There should have been read metrics only for one table: " + tableName,
1, numTables);
-    }
-
-    private void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType,
Long>> mutationMetrics) {
-        assertTrue("No mutation metrics present when there should have been", mutationMetrics.size()
> 0);
-        for (Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet())
{
-            String t = entry.getKey();
-            assertEquals("Table name didn't match for mutation metrics", tableName, t);
-            Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been four metrics", 4, p.size());
-            for (Entry<MetricType, Long> metric : p.entrySet()) {
-            	MetricType metricType = metric.getKey();
-                long metricValue = metric.getValue();
-                if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
-                    assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
-                } else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) {
-                    assertTrue("Mutation commit time should be greater than zero", metricValue
> 0);
-                } else if (metricType.equals(MetricType.MUTATION_BYTES)) {
-                    assertTrue("Mutation bytes size should be greater than zero", metricValue
> 0);
-                } else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
-                    assertEquals("Zero failed mutations expected", 0, metricValue);
-                }
-            }
-        }
-    }
-
     @Test
     public void testGetConnectionsForSameUrlConcurrently()  throws Exception {
         // establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
@@ -1020,74 +943,5 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
             return c;
         }
     }
-    
-    @Test
-    public void testPhoenixMetricsLogged() throws Exception {
-        final Map<MetricType, Long> overAllQueryMetricsMap = Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> requestReadMetricsMap = Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> mutationWriteMetricsMap = Maps.newHashMap();
-        final Map<String, Map<MetricType, Long>> mutationReadMetricsMap = Maps.newHashMap();
-
-        String tableName1 = generateUniqueName();
-        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V
VARCHAR)";
-        Connection ddlConn = DriverManager.getConnection(getUrl());
-        ddlConn.createStatement().execute(ddl);
-        ddlConn.close();
-        insertRowsInTable(tableName1, 10);
-
-        String tableName2 = generateUniqueName();
-        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)";
-        ddlConn = DriverManager.getConnection(getUrl());
-        ddlConn.createStatement().execute(ddl);
-        ddlConn.close();
 
-        Connection conn = DriverManager.getConnection(getUrl());
-        LoggingPhoenixConnection protectedConn =
-                new LoggingPhoenixConnection(conn, new PhoenixMetricsLog() {
-                    @Override
-                    public void logOverAllReadRequestMetrics(
-                            Map<MetricType, Long> overAllQueryMetrics) {
-                        overAllQueryMetricsMap.putAll(overAllQueryMetrics);
-                    }
-
-                    @Override
-                    public void logRequestReadMetrics(
-                            Map<String, Map<MetricType, Long>> requestReadMetrics)
{
-                        requestReadMetricsMap.putAll(requestReadMetrics);
-                    }
-
-                    @Override
-                    public void logWriteMetricsfoForMutations(
-                            Map<String, Map<MetricType, Long>> mutationWriteMetrics)
{
-                        mutationWriteMetricsMap.putAll(mutationWriteMetrics);
-                    }
-
-                    @Override
-                    public void logReadMetricInfoForMutationsSinceLastReset(
-                            Map<String, Map<MetricType, Long>> mutationReadMetrics)
{
-                        mutationReadMetricsMap.putAll(mutationReadMetrics);
-                    }
-                });
-        
-        // run SELECT to verify read metrics are logged
-        String query = "SELECT * FROM " + tableName1;
-        Statement stmt = protectedConn.createStatement();
-        ResultSet rs = stmt.executeQuery(query);
-        while (rs.next()) {
-        }
-        rs.close();
-        assertTrue("Read metrics for not found for " + tableName1,
-            requestReadMetricsMap.get(tableName1).size() > 0);
-        assertTrue("Overall read metrics for not found ", overAllQueryMetricsMap.size() >
0);
-
-        // run UPSERT SELECT to verify mutation metrics are logged
-        String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
-        protectedConn.createStatement().executeUpdate(upsertSelect);
-        protectedConn.commit();
-        assertTrue("Mutation write metrics for not found for " + tableName2,
-            mutationWriteMetricsMap.get(tableName2).size() > 0);
-        assertTrue("Mutation read metrics for not found for " + tableName1,
-            mutationReadMetricsMap.get(tableName1).size() > 0);
-        protectedConn.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cd16bc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
index d98da83..9a2e00f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/LoggingPhoenixConnection.java
@@ -98,7 +98,7 @@ public class LoggingPhoenixConnection extends DelegateConnection {
         return new LoggingPhoenixPreparedStatement(super.prepareStatement(sql, columnNames),
                 phoenixMetricsLog);
     }
-    
+
     @Override
     public void commit() throws SQLException {
         super.commit();
@@ -107,4 +107,14 @@ public class LoggingPhoenixConnection extends DelegateConnection {
         PhoenixRuntime.resetMetrics(conn);
     }
 
+    @Override
+    public void close() throws SQLException {
+        try {
+            phoenixMetricsLog.logWriteMetricsfoForMutations(PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn));
+            phoenixMetricsLog.logReadMetricInfoForMutationsSinceLastReset(PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(conn));
+            PhoenixRuntime.resetMetrics(conn);
+        } finally {
+            super.close();
+        }
+    }
 }


Mime
View raw message