phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/3] phoenix git commit: PHOENIX-2558 Fix server-side cache memory leaks
Date Mon, 04 Jan 2016 17:36:07 GMT
PHOENIX-2558 Fix server-side cache memory leaks


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/45d75aac
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/45d75aac
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/45d75aac

Branch: refs/heads/4.x-HBase-0.98
Commit: 45d75aac7ab153365e25c5e52004bf7e5bc75cae
Parents: 6d0f25c
Author: James Taylor <jtaylor@salesforce.com>
Authored: Sun Jan 3 17:08:31 2016 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Mon Jan 4 09:34:00 2016 -0800

----------------------------------------------------------------------
 .../end2end/BaseTenantSpecificViewIndexIT.java  |  2 --
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 14 +++++++++++++
 .../apache/phoenix/end2end/HashJoinMoreIT.java  | 15 +++++++++++++
 .../end2end/QueryDatabaseMetaDataIT.java        | 18 +++++++---------
 .../apache/phoenix/end2end/QueryTimeoutIT.java  | 14 +++++++++++++
 .../apache/phoenix/cache/ServerCacheClient.java | 13 +++++++++++-
 .../apache/phoenix/execute/MutationState.java   | 16 +++++++-------
 .../phoenix/iterate/BaseResultIterators.java    | 22 +++++++++++++++++++-
 .../MaterializedComparableResultIterator.java   |  3 +++
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |  3 +++
 10 files changed, 98 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
index b450643..c10afa6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseTenantSpecificViewIndexIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.end2end;
 
 import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -178,6 +177,5 @@ public class BaseTenantSpecificViewIndexIT extends BaseHBaseManagedTimeIT
{
             Arrays.<Object>asList(1,7, valuePrefix + "v2-1"),
             Arrays.<Object>asList(1,9, valuePrefix + "v2-1"));
         assertValuesEqualsResultSet(rs,expectedResultsA);
-        assertFalse(rs.next());
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 3d8b006..dd7f6ba 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -50,12 +50,14 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -78,6 +80,18 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
         this.plans = plans;
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    
     @BeforeClass
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
index a8bb977..98264f0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinMoreIT.java
@@ -27,14 +27,17 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -101,6 +104,17 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testJoinOverSaltedTables() throws Exception {
         String tempTableNoSalting = "TEMP_TABLE_NO_SALTING";
@@ -555,6 +569,7 @@ public class HashJoinMoreIT extends BaseHBaseManagedTimeIT {
             assertTrue(rs.next());
             assertEquals(rs.getInt(1), 5);
             assertFalse(rs.next());
+            rs.close();
             rs = conn.createStatement().executeQuery(
                     "SELECT * FROM INVENTORY RIGHT JOIN PRODUCT_IDS ON (PRODUCT_ID = INVENTORY.ID)");
             assertTrue(rs.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 2fdccf6..ba83e6a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -48,13 +48,11 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
@@ -69,15 +67,15 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.types.PDecimal;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -151,17 +149,15 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT
{
 
         rs = dbmd.getTables(null, CUSTOM_ENTITY_DATA_SCHEMA_NAME, CUSTOM_ENTITY_DATA_NAME,
null);
         assertTrue(rs.next());
-        assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME);
-        assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME);
-        assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE"));
-        assertFalse(rs.next());
-        
         try {
             rs.getString("RANDOM_COLUMN_NAME");
             fail();
         } catch (ColumnNotFoundException e) {
             // expected
         }
+        assertEquals(rs.getString("TABLE_SCHEM"),CUSTOM_ENTITY_DATA_SCHEMA_NAME);
+        assertEquals(rs.getString("TABLE_NAME"),CUSTOM_ENTITY_DATA_NAME);
+        assertEquals(PTableType.TABLE.toString(), rs.getString("TABLE_TYPE"));
         assertFalse(rs.next());
         
         rs = dbmd.getTables(null, "", "_TABLE", new String[] {PTableType.TABLE.toString()});

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
index ba7b461..ccd6530 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
@@ -28,15 +28,18 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.SQLTimeoutException;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -56,6 +59,17 @@ public class QueryTimeoutIT extends BaseOwnClusterHBaseManagedTimeIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals(0,unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testQueryTimeout() throws Exception {
         int nRows = 30000;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index f188ab2..424482a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -301,6 +301,7 @@ public class ServerCacheClient {
     	ConnectionQueryServices services = connection.getQueryServices();
     	Throwable lastThrowable = null;
     	TableRef cacheUsingTableRef = cacheUsingTableRefMap.get(Bytes.mapKey(cacheId));
+    	final PTable cacheUsingTable = cacheUsingTableRef.getTable();
     	byte[] tableName = cacheUsingTableRef.getTable().getPhysicalName().getBytes();
     	HTableInterface iterateOverTable = services.getTable(tableName);
     	try {
@@ -326,7 +327,17 @@ public class ServerCacheClient {
     									new BlockingRpcCallback<RemoveServerCacheResponse>();
     							RemoveServerCacheRequest.Builder builder = RemoveServerCacheRequest.newBuilder();
     							if(connection.getTenantId() != null){
-    								builder.setTenantId(ByteStringer.wrap(connection.getTenantId().getBytes()));
+                                    try {
+                                        byte[] tenantIdBytes =
+                                                ScanUtil.getTenantIdBytes(
+                                                        cacheUsingTable.getRowKeySchema(),
+                                                        cacheUsingTable.getBucketNum()!=null,
+                                                        connection.getTenantId(),
+                                                        cacheUsingTable.isMultiTenant());
+                                        builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+                                    } catch (SQLException e) {
+                                        new IOException(e);
+                                    }
     							}
     							builder.setCacheId(ByteStringer.wrap(cacheId));
     							instance.removeServerCache(controller, builder.build(), rpcCallback);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 0ecce7d..3528d59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.cloudera.htrace.Span;
@@ -703,6 +704,8 @@ public class MutationState implements SQLCloseable {
          */
         @Override
         public void delete(List<Delete> deletes) throws IOException {
+            ServerCache cache = null;
+            SQLException sqlE = null;
             try {
                 PTable table = tableRef.getTable();
                 List<PTable> indexes = table.getIndexes();
@@ -736,12 +739,16 @@ public class MutationState implements SQLCloseable {
                         IndexMaintainer.serializeAdditional(table, indexMetaDataPtr, keyValueIndexes,
connection);
                     }
                     if (attachMetaData) {
-                        setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
+                        cache = setMetaDataOnMutations(tableRef, deletes, indexMetaDataPtr);
                     }
                 }
                 delegate.delete(deletes);
             } catch (SQLException e) {
                 throw new IOException(e);
+            } finally {
+                if (cache != null) {
+                    SQLCloseables.closeAllQuietly(Collections.singletonList(cache));
+                }
             }
         }
     }
@@ -793,10 +800,7 @@ public class MutationState implements SQLCloseable {
 	                int retryCount = 0;
 	                boolean shouldRetry = false;
 	                do {
-	                    ServerCache cache = null;
-	                    if (isDataTable) {
-	                        cache = setMetaDataOnMutations(tableRef, mutationList, indexMetaDataPtr);
-	                    }
+	                    final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef,
mutationList, indexMetaDataPtr) : null;
 	                
 	                    // If we haven't retried yet, retry for this case only, as it's possible
that
 	                    // a split will occur after we send the index metadata cache to all
known
@@ -879,8 +883,6 @@ public class MutationState implements SQLCloseable {
 	                                }
 	                            } 
 	                            if (sqlE != null) {
-	                            	// clear pending mutations
-	                            	mutations.clear();
 	                                throw sqlE;
 	                            }
 	                        }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 21f082f..2806acd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -704,6 +704,7 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
         // to get into a funk. Instead, just cancel queued work.
         boolean cancelledWork = false;
         try {
+            List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size());
             for (List<List<Pair<Scan,Future<PeekingResultIterator>>>>
futures : allFutures) {
                 for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans
: futures) {
                     for (Pair<Scan,Future<PeekingResultIterator>> futurePair
: futureScans) {
@@ -712,16 +713,35 @@ public abstract class BaseResultIterators extends ExplainTable implements
Result
                         if (futurePair != null) {
                             Future<PeekingResultIterator> future = futurePair.getSecond();
                             if (future != null) {
-                                future.cancel(false);
+                                if (future.cancel(false)) {
+                                    cancelledWork = true;
+                                } else {
+                                    futuresToClose.add(future);
+                                }
                             }
                         }
                     }
                 }
             }
+            // Wait for already started tasks to complete as we can't interrupt them without
+            // leaving our HConnection in a funky state.
+            for (Future<PeekingResultIterator> future : futuresToClose) {
+                try {
+                    PeekingResultIterator iterator = future.get();
+                    iterator.close();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                } catch (ExecutionException e) {
+                    logger.info("Failed to execute task during cancel", e);
+                    continue;
+                }
+            }
         } finally {
             if (cancelledWork) {
                 context.getConnection().getQueryServices().getExecutor().purge();
             }
+            allFutures.clear();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
index 093a098..a76f1e3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
@@ -45,16 +45,19 @@ public class MaterializedComparableResultIterator
         this.current = delegate.peek();
     }
 
+    @Override
     public Tuple next() throws SQLException {
         Tuple next = delegate.next();
         this.current = delegate.peek();
         return next;
     }
 
+    @Override
     public Tuple peek() throws SQLException {
         return delegate.peek();
     }
 
+    @Override
     public void close() throws SQLException {
         delegate.close();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/45d75aac/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index a3ce1a1..47c17ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -776,6 +776,9 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable {
                 overAllQueryMetrics.startResultSetWatch();
             }
             currentRow = scanner.next();
+            if (currentRow == null) {
+                close();
+            }
             rowProjector.reset();
         } catch (RuntimeException e) {
             // FIXME: Expression.evaluate does not throw SQLException


Mime
View raw message