phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sama...@apache.org
Subject phoenix git commit: PHOENIX-3981 Ensure iterators are closed for join plans
Date Wed, 28 Jun 2017 20:32:20 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 e6352e574 -> 618fb2d36


PHOENIX-3981 Ensure iterators are closed for join plans


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/618fb2d3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/618fb2d3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/618fb2d3

Branch: refs/heads/4.x-HBase-1.1
Commit: 618fb2d36b110a856c708d3b1b9fbeeef73f486c
Parents: e6352e5
Author: Samarth Jain <samarth@apache.org>
Authored: Wed Jun 28 13:32:14 2017 -0700
Committer: Samarth Jain <samarth@apache.org>
Committed: Wed Jun 28 13:32:14 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/HashJoinPlan.java    | 102 +++++++++++--------
 .../phoenix/execute/SortMergeJoinPlan.java      |  32 +++++-
 2 files changed, 89 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/618fb2d3/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 64e2ce2..17c3cca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -293,39 +293,43 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public ServerCache execute(HashJoinPlan parent) throws SQLException {
             List<Object> values = Lists.<Object> newArrayList();
             ResultIterator iterator = plan.iterator();
-            RowProjector projector = plan.getProjector();
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            int columnCount = projector.getColumnCount();
-            int rowCount = 0;
-            PDataType baseType = PVarbinary.INSTANCE;
-            for (Tuple tuple = iterator.next(); tuple != null; tuple = iterator.next()) {
-                if (expectSingleRow && rowCount >= 1)
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
-                
-                if (columnCount == 1) {
-                    ColumnProjector columnProjector = projector.getColumnProjector(0);
-                    baseType = columnProjector.getExpression().getDataType();
-                    Object value = columnProjector.getValue(tuple, baseType, ptr);
-                    values.add(value);
-                } else {
-                    List<Expression> expressions = Lists.<Expression>newArrayListWithExpectedSize(columnCount);
-                    for (int i = 0; i < columnCount; i++) {
-                        ColumnProjector columnProjector = projector.getColumnProjector(i);
-                        PDataType type = columnProjector.getExpression().getDataType();
-                        Object value = columnProjector.getValue(tuple, type, ptr);
-                        expressions.add(LiteralExpression.newConstant(value, type));
+            try {
+                RowProjector projector = plan.getProjector();
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                int columnCount = projector.getColumnCount();
+                int rowCount = 0;
+                PDataType baseType = PVarbinary.INSTANCE;
+                for (Tuple tuple = iterator.next(); tuple != null; tuple = iterator.next())
{
+                    if (expectSingleRow && rowCount >= 1)
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+
+                    if (columnCount == 1) {
+                        ColumnProjector columnProjector = projector.getColumnProjector(0);
+                        baseType = columnProjector.getExpression().getDataType();
+                        Object value = columnProjector.getValue(tuple, baseType, ptr);
+                        values.add(value);
+                    } else {
+                        List<Expression> expressions = Lists.<Expression>newArrayListWithExpectedSize(columnCount);
+                        for (int i = 0; i < columnCount; i++) {
+                            ColumnProjector columnProjector = projector.getColumnProjector(i);
+                            PDataType type = columnProjector.getExpression().getDataType();
+                            Object value = columnProjector.getValue(tuple, type, ptr);
+                            expressions.add(LiteralExpression.newConstant(value, type));
+                        }
+                        Expression expression = new RowValueConstructorExpression(expressions,
true);
+                        baseType = expression.getDataType();
+                        expression.evaluate(null, ptr);
+                        values.add(baseType.toObject(ptr));
                     }
-                    Expression expression = new RowValueConstructorExpression(expressions,
true);
-                    baseType = expression.getDataType();
-                    expression.evaluate(null, ptr);
-                    values.add(baseType.toObject(ptr));
+                    rowCount++;
                 }
-                rowCount++;
+
+                Object result = expectSingleRow ? (values.isEmpty() ? null : values.get(0))
: PArrayDataType.instantiatePhoenixArray(baseType, values.toArray());
+                parent.getContext().setSubqueryResult(select, result);
+                return null;
+            } finally {
+                iterator.close();
             }
-            
-            Object result = expectSingleRow ? (values.isEmpty() ? null : values.get(0)) :
PArrayDataType.instantiatePhoenixArray(baseType, values.toArray());
-            parent.getContext().setSubqueryResult(select, result);
-            return null;
         }
 
         @Override
@@ -383,21 +387,37 @@ public class HashJoinPlan extends DelegateQueryPlan {
             }
             ServerCache cache = null;
             if (hashExpressions != null) {
-                cache = parent.hashClient.addHashCache(ranges, plan.iterator(), 
-                        plan.getEstimatedSize(), hashExpressions, singleValueOnly, parent.delegate.getTableRef(),
keyRangeRhsExpression, keyRangeRhsValues);
-                long endTime = System.currentTimeMillis();
-                boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
-                if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive)
{
-                    LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems
too slow. Earlier hash cache(s) might have expired on servers.", parent.delegate.getContext().getConnection()));
+                ResultIterator iterator = plan.iterator();
+                try {
+                    cache =
+                            parent.hashClient.addHashCache(ranges, iterator,
+                                plan.getEstimatedSize(), hashExpressions, singleValueOnly,
+                                parent.delegate.getTableRef(), keyRangeRhsExpression,
+                                keyRangeRhsValues);
+                    long endTime = System.currentTimeMillis();
+                    boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime);
+                    if (!isSet && (endTime
+                            - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive)
{
+                        LOG.warn(addCustomAnnotations(
+                            "Hash plan [" + index
+                                    + "] execution seems too slow. Earlier hash cache(s)
might have expired on servers.",
+                            parent.delegate.getContext().getConnection()));
+                    }
+                } finally {
+                    iterator.close();
                 }
             } else {
-                assert(keyRangeRhsExpression != null);
+                assert (keyRangeRhsExpression != null);
                 ResultIterator iterator = plan.iterator();
-                for (Tuple result = iterator.next(); result != null; result = iterator.next())
{
-                    // Evaluate key expressions for hash join key range optimization.
-                    keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(keyRangeRhsExpression,
result, plan.getContext().getTempPtr()));
+                try {
+                    for (Tuple result = iterator.next(); result != null; result = iterator.next())
{
+                        // Evaluate key expressions for hash join key range optimization.
+                        keyRangeRhsValues.add(HashCacheClient.evaluateKeyExpression(
+                            keyRangeRhsExpression, result, plan.getContext().getTempPtr()));
+                    }
+                } finally {
+                    iterator.close();
                 }
-                iterator.close();
             }
             if (keyRangeRhsValues != null) {
                 parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression,
keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), plan.getContext().getCurrentTable().getTable().rowKeyOrderOptimizable()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/618fb2d3/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 4b63c50..568094a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -239,6 +239,26 @@ public class SortMergeJoinPlan implements QueryPlan {
         return false;
     }
     
+    private static SQLException closeIterators(ResultIterator lhsIterator, ResultIterator
rhsIterator) {
+        SQLException e = null;
+        try {
+            lhsIterator.close();
+        } catch (Throwable e1) {
+            e = e1 instanceof SQLException ? (SQLException)e1 : new SQLException(e1);
+        }
+        try {
+            rhsIterator.close();
+        } catch (Throwable e2) {
+            SQLException e22 = e2 instanceof SQLException ? (SQLException)e2 : new SQLException(e2);
+            if (e != null) {
+                e.setNextException(e22);
+            } else {
+                e = e22;
+            }
+        }
+        return e;
+    }
+
     private class BasicJoinIterator implements ResultIterator {
         private final ResultIterator lhsIterator;
         private final ResultIterator rhsIterator;
@@ -283,9 +303,11 @@ public class SortMergeJoinPlan implements QueryPlan {
         
         @Override
         public void close() throws SQLException {
-            lhsIterator.close();
-            rhsIterator.close();
+            SQLException e = closeIterators(lhsIterator, rhsIterator);
             queue.close();
+            if (e != null) {
+                throw e;
+            }
         }
 
         @Override
@@ -453,8 +475,10 @@ public class SortMergeJoinPlan implements QueryPlan {
 
         @Override
         public void close() throws SQLException {
-            lhsIterator.close();
-            rhsIterator.close();
+            SQLException e = closeIterators(lhsIterator, rhsIterator);
+            if (e != null) {
+                throw e;
+            }
         }
 
         @Override


Mime
View raw message