phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-2556 Subqueries with nested joins may not free hash cache
Date Tue, 05 Jan 2016 14:14:48 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master ab8fa5ef4 -> 5327e1b27


PHOENIX-2556 Subqueries with nested joins may not free hash cache


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/07cdaa44
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/07cdaa44
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/07cdaa44

Branch: refs/heads/master
Commit: 07cdaa4405dd7628669b595a75af14116bb961f1
Parents: 3e41fe3
Author: maryannxue <maryann.xue@gmail.com>
Authored: Tue Jan 5 09:14:09 2016 -0500
Committer: maryannxue <maryann.xue@gmail.com>
Committed: Tue Jan 5 09:14:09 2016 -0500

----------------------------------------------------------------------
 .../ConnectionQueryServicesTestImpl.java        |  5 +-
 .../org/apache/phoenix/end2end/SubqueryIT.java  | 48 ++++++++++++++++----
 .../apache/phoenix/compile/DeleteCompiler.java  | 36 ++++++++-------
 .../apache/phoenix/execute/HashJoinPlan.java    | 40 ++++++++--------
 4 files changed, 85 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/07cdaa44/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index f5d7f18..2d0ed60 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.junit.Assert.assertEquals;
+
 import java.sql.SQLException;
 import java.util.Properties;
 import java.util.Set;
@@ -82,8 +84,7 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
             this.connections = Sets.newHashSet();
             SQLCloseables.closeAll(connections);
             long unfreedBytes = clearCache();
-            // FIXME: once PHOENIX-2556 is fixed, comment this back in
-            // assertEquals(0,unfreedBytes);
+            assertEquals("Found unfreed bytes in server-side cache", 0, unfreedBytes);
         } finally {
             super.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/07cdaa44/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index 794c4f5..90ce327 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -45,12 +45,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -366,6 +368,17 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
         }
     }
 
+    @After
+    public void assertNoUnfreedMemory() throws SQLException {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            long unfreedBytes = conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
+            assertEquals("Found bytes not freed on server side", 0, unfreedBytes);
+        } finally {
+            conn.close();
+        }
+    }
+    
     @Test
     public void testNonCorrelatedSubquery() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -628,9 +641,12 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
     @Test
     public void testComparisonSubquery() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
+        final Connection conn = DriverManager.getConnection(getUrl(), props);
         try {
-            String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity
= (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
+            String query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + 
+                    " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity)
FROM " + 
+                    JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\")";
             PreparedStatement statement = conn.prepareStatement(query);
             ResultSet rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -648,7 +664,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o
JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity =
(SELECT max(quantity) FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + JOIN_ORDER_TABLE_FULL_NAME
+ " q ON i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" = i2.\"item_id\")";
+            query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + 
+                    " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity = (SELECT max(quantity)
FROM " + 
+                    JOIN_ITEM_TABLE_FULL_NAME + " i2 JOIN " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " q ON i2.\"item_id\" = q.\"item_id\" WHERE o.\"item_id\" = i2.\"item_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -666,7 +686,11 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + " WHERE \"customer_id\"
IN (SELECT \"customer_id\" FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME
+ " o ON o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' OR quantity > (SELECT avg(quantity)
FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\"))";
+            query = "SELECT name from " + JOIN_CUSTOMER_TABLE_FULL_NAME + 
+                    " WHERE \"customer_id\" IN (SELECT \"customer_id\" FROM " + 
+                    JOIN_ITEM_TABLE_FULL_NAME + " i JOIN " + JOIN_ORDER_TABLE_FULL_NAME +

+                    " o ON o.\"item_id\" = i.\"item_id\" WHERE i.name = 'T2' OR quantity
> (SELECT avg(quantity) FROM " + 
+                    JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\"))";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -680,7 +704,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             String plan = QueryUtil.getExplainPlan(rs);
             assertTrue("\"" + plan + "\" does not match \"" + plans[4] + "\"", Pattern.matches(plans[4],
plan));
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE
quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" =
\"item_id\" AND \"order_id\" != '000000000000004')";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004')";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -694,7 +720,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE
quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\" =
\"item_id\" AND \"order_id\" != '000000000000003')";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003')";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             try {
@@ -703,7 +731,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
             } catch (SQLException e) {                
             }
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE
quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\"
= \"item_id\" AND \"order_id\" != '000000000000004' GROUP BY \"order_id\")";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000004'
GROUP BY \"order_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -717,7 +747,9 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
 
             assertFalse(rs.next());
 
-            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o WHERE
quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " WHERE o.\"item_id\"
= \"item_id\" AND \"order_id\" != '000000000000003' GROUP BY \"order_id\")";
+            query = "SELECT \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + 
+                    " o WHERE quantity = (SELECT max(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME
+ 
+                    " WHERE o.\"item_id\" = \"item_id\" AND \"order_id\" != '000000000000003'
GROUP BY \"order_id\")";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/07cdaa44/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ac9f7d1..924ed43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -618,23 +618,27 @@ public class DeleteCompiler {
                     @Override
                     public MutationState execute() throws SQLException {
                         ResultIterator iterator = plan.iterator();
-                        if (!hasLimit) {
-                            Tuple tuple;
-                            long totalRowCount = 0;
-                            while ((tuple=iterator.next()) != null) {// Runs query
-                                Cell kv = tuple.getValue(0);
-                                totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(),
kv.getValueOffset(), SortOrder.getDefault());
+                        try {
+                            if (!hasLimit) {
+                                Tuple tuple;
+                                long totalRowCount = 0;
+                                while ((tuple=iterator.next()) != null) {// Runs query
+                                    Cell kv = tuple.getValue(0);
+                                    totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(),
kv.getValueOffset(), SortOrder.getDefault());
+                                }
+                                // Return total number of rows that have been delete. In
the case of auto commit being off
+                                // the mutations will all be in the mutation state of the
current connection.
+                                MutationState state = new MutationState(maxSize, connection,
totalRowCount);
+
+                                // set the read metrics accumulated in the parent context
so that it can be published when the mutations are committed.
+                                state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
+
+                                return state;
+                            } else {
+                                return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo
? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
                             }
-                            // Return total number of rows that have been delete. In the
case of auto commit being off
-                            // the mutations will all be in the mutation state of the current
connection.
-                            MutationState state = new MutationState(maxSize, connection,
totalRowCount);
-                            
-                            // set the read metrics accumulated in the parent context so
that it can be published when the mutations are committed.
-                            state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());
-                            
-                            return state;
-                        } else {
-                            return deleteRows(plan.getContext(), tableRef, deleteFromImmutableIndexToo
? plan.getTableRef() : null, iterator, plan.getProjector(), plan.getTableRef());
+                        } finally {
+                            iterator.close();
                         }
                     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/07cdaa44/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index cf89380..053dc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -88,7 +88,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
-    private List<SQLCloseable> dependencies;
+    private final List<SQLCloseable> dependencies = Lists.newArrayList();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
@@ -96,7 +96,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
         if (!(plan instanceof HashJoinPlan))
-            return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null);
+            return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null,
Collections.<SQLCloseable>emptyList());
         
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
         assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof
BaseQueryPlan);
@@ -108,12 +108,13 @@ public class HashJoinPlan extends DelegateQueryPlan {
         for (SubPlan subPlan : subPlans) {
             mergedSubPlans[i++] = subPlan;
         }
-        return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans,
true);
+        return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans,
true, hashJoinPlan.dependencies);
     }
     
     private HashJoinPlan(SelectStatement statement, 
-            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause)
{
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause,
List<SQLCloseable> dependencies) {
         super(plan);
+        this.dependencies.addAll(dependencies);
         this.statement = statement;
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
@@ -143,8 +144,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
         ExecutorService executor = services.getExecutor();
-        List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count);
-        dependencies = Lists.newArrayList();
+        List<Future<ServerCache>> futures = Lists.newArrayListWithExpectedSize(count);
         if (joinInfo != null) {
             hashClient = hashClient != null ? 
                     hashClient 
@@ -155,11 +155,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
         
         for (int i = 0; i < count; i++) {
             final int index = i;
-            futures.add(executor.submit(new JobCallable<Object>() {
+            futures.add(executor.submit(new JobCallable<ServerCache>() {
 
                 @Override
-                public Object call() throws Exception {
-                    return subPlans[index].execute(HashJoinPlan.this);
+                public ServerCache call() throws Exception {
+                    ServerCache cache = subPlans[index].execute(HashJoinPlan.this);
+                    return cache;
                 }
 
                 @Override
@@ -177,7 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
         SQLException firstException = null;
         for (int i = 0; i < count; i++) {
             try {
-                Object result = futures.get(i).get();
+                ServerCache result = futures.get(i).get();
+                if (result != null) {
+                    dependencies.add(result);
+                }
                 subPlans[i].postProcess(result, this);
             } catch (InterruptedException e) {
                 if (firstException == null) {
@@ -261,8 +265,8 @@ public class HashJoinPlan extends DelegateQueryPlan {
     }
 
     protected interface SubPlan {
-        public Object execute(HashJoinPlan parent) throws SQLException;
-        public void postProcess(Object result, HashJoinPlan parent) throws SQLException;
+        public ServerCache execute(HashJoinPlan parent) throws SQLException;
+        public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException;
         public List<String> getPreSteps(HashJoinPlan parent) throws SQLException;
         public List<String> getPostSteps(HashJoinPlan parent) throws SQLException;
         public QueryPlan getInnerPlan();
@@ -280,7 +284,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public Object execute(HashJoinPlan parent) throws SQLException {
+        public ServerCache execute(HashJoinPlan parent) throws SQLException {
             List<Object> values = Lists.<Object> newArrayList();
             ResultIterator iterator = plan.iterator();
             RowProjector projector = plan.getProjector();
@@ -319,7 +323,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public void postProcess(Object result, HashJoinPlan parent) throws SQLException {
+        public void postProcess(ServerCache result, HashJoinPlan parent) throws SQLException
{
         }
 
         @Override
@@ -365,7 +369,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public Object execute(HashJoinPlan parent) throws SQLException {
+        public ServerCache execute(HashJoinPlan parent) throws SQLException {
             ScanRanges ranges = parent.delegate.getContext().getScanRanges();
             List<Expression> keyRangeRhsValues = null;
             if (keyRangeRhsExpression != null) {
@@ -387,6 +391,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
                     // Evaluate key expressions for hash join key range optimization.
                     keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression,
result, plan.getContext().getTempPtr()));
                 }
+                iterator.close();
             }
             if (keyRangeRhsValues != null) {
                 parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression,
keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable()));
@@ -395,12 +400,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         @Override
-        public void postProcess(Object result, HashJoinPlan parent)
+        public void postProcess(ServerCache result, HashJoinPlan parent)
                 throws SQLException {
-            ServerCache cache = (ServerCache) result;
+            ServerCache cache = result;
             if (cache != null) {
                 parent.joinInfo.getJoinIds()[index].set(cache.getId());
-                parent.dependencies.add(cache);
             }
         }
 


Mime
View raw message