phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-4010 Hash Join cache may not be send to all regionservers when we have stale HBase meta cache
Date Sat, 09 Sep 2017 08:04:14 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 68e6cf045 -> 4bfb23aee


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
new file mode 100644
index 0000000..6d67348
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinCacheNotFoundException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+
+public class HashJoinCacheNotFoundException extends SQLException{
+    private static final long serialVersionUID = 1L;
+    private Long cacheId;
+    private static SQLExceptionCode ERROR_CODE = SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND;
+    public HashJoinCacheNotFoundException() {
+        this(null);
+    }
+
+    public HashJoinCacheNotFoundException(Long cacheId) {
+        super(new SQLExceptionInfo.Builder(ERROR_CODE).setMessage("joinId: " + cacheId
+                + ". The cache might have expired and have been removed.").build().toString(),
+                ERROR_CODE.getSQLState(), ERROR_CODE.getErrorCode(), null);
+        this.cacheId=cacheId;
+    }
+    
+    public Long getCacheId(){
+        return this.cacheId;
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index ced29bd..5061d94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -46,6 +46,7 @@ import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.PositionBasedResultTuple;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
@@ -97,10 +98,12 @@ public class HashJoinRegionScanner implements RegionScanner {
                 continue;
             }
             HashCache hashCache = (HashCache)cache.getServerCache(joinId);
-            if (hashCache == null)
-                throw new DoNotRetryIOException("Could not find hash cache for joinId: "
-                        + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength())
-                        + ". The cache might have expired and have been removed.");
+            if (hashCache == null) {
+                Exception cause = new HashJoinCacheNotFoundException(
+                        Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId)));
+                throw new DoNotRetryIOException(cause.getMessage(), cause);
+            }
+                
             hashCaches[i] = hashCache;
             tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ff4a35c..c9dce27 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -439,7 +439,8 @@ public enum SQLExceptionCode {
         " of connections to the target cluster."),
     
     MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger than maximum allowed number of rows"),
-    MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes");
+    MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes"), 
+    HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 2cdaac7..74c8d39 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -21,9 +21,10 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
@@ -36,6 +37,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.RowKeyExpression;
 import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.AggregatingResultIterator;
 import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.iterate.ConcatResultIterator;
@@ -185,7 +187,7 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         if (groupBy.isEmpty()) {
             UngroupedAggregateRegionObserver.serializeIntoScan(scan);
         } else {
@@ -221,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan {
             }
         }
         BaseResultIterators iterators = isSerial
-                ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan)
-                : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false);
+                ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches)
+                : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches);
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();
         splits = iterators.getSplits();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index e6e7b97..238a537 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -24,6 +24,7 @@ import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -47,6 +49,7 @@ import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
@@ -73,7 +76,6 @@ import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -196,20 +198,20 @@ public abstract class BaseQueryPlan implements QueryPlan {
 
     @Override
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), null);
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
     
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, null);
+        return iterator(scanGrouper, null);
     }
 
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan);
+        return iterator(Collections.<ImmutableBytesPtr,ServerCache>emptyMap(), scanGrouper, scan);
     }
-    
-	private ResultIterator getWrappedIterator(final List<? extends SQLCloseable> dependencies,
+        
+	private ResultIterator getWrappedIterator(final Map<ImmutableBytesPtr,ServerCache> dependencies,
 			ResultIterator iterator) {
 		ResultIterator wrappedIterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) {
 			@Override
@@ -217,14 +219,15 @@ public abstract class BaseQueryPlan implements QueryPlan {
 				try {
 					super.close();
 				} finally {
-					SQLCloseables.closeAll(dependencies);
+					SQLCloseables.closeAll(dependencies.values());
 				}
 			}
 		};
 		return wrappedIterator;
 	}
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    public final ResultIterator iterator(final Map<ImmutableBytesPtr,ServerCache> caches,
+            ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
          if (scan == null) {
              scan = context.getScan();
          }
@@ -235,11 +238,11 @@ public abstract class BaseQueryPlan implements QueryPlan {
 		 * row to be scanned.
 		 */
         if (context.getScanRanges() == ScanRanges.NOTHING && !getStatement().isAggregate()) {
-            return getWrappedIterator(dependencies, ResultIterator.EMPTY_ITERATOR);
+        return getWrappedIterator(caches, ResultIterator.EMPTY_ITERATOR);
         }
         
         if (tableRef == TableRef.EMPTY_TABLE_REF) {
-            return getWrappedIterator(dependencies, newIterator(scanGrouper, scan));
+            return newIterator(scanGrouper, scan, caches);
         }
         
         // Set miscellaneous scan attributes. This is the last chance to set them before we
@@ -344,7 +347,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         	LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = getWrappedIterator(dependencies, newIterator(scanGrouper, scan));
+        ResultIterator iterator =  newIterator(scanGrouper, scan, caches);
         if (LOG.isDebugEnabled()) {
         	LOG.debug(LogUtil.addCustomAnnotations("Iterator ready: " + iterator, connection));
         }
@@ -468,7 +471,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 17c3cca..879aa61 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -24,6 +24,7 @@ import static org.apache.phoenix.util.NumberUtil.add;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -51,6 +52,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -73,10 +75,10 @@ import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class HashJoinPlan extends DelegateQueryPlan {
@@ -88,7 +90,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final boolean recompileWhereClause;
     private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
-    private final List<SQLCloseable> dependencies = Lists.newArrayList();
+    private final Map<ImmutableBytesPtr,ServerCache> dependencies = Maps.newHashMap();
     private HashCacheClient hashClient;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
@@ -99,7 +101,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     public static HashJoinPlan create(SelectStatement statement, 
             QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) throws SQLException {
         if (!(plan instanceof HashJoinPlan))
-            return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<SQLCloseable>emptyList());
+            return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null, Collections.<ImmutableBytesPtr,ServerCache>emptyMap());
         
         HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
         assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan);
@@ -115,9 +117,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
     }
     
     private HashJoinPlan(SelectStatement statement, 
-            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, List<SQLCloseable> dependencies) throws SQLException {
+            QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause, Map<ImmutableBytesPtr,ServerCache> dependencies) throws SQLException {
         super(plan);
-        this.dependencies.addAll(dependencies);
+        this.dependencies.putAll(dependencies);
         this.statement = statement;
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
@@ -182,7 +184,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             try {
                 ServerCache result = futures.get(i).get();
                 if (result != null) {
-                    dependencies.add(result);
+                    dependencies.put(new ImmutableBytesPtr(result.getId()),result);
                 }
                 subPlans[i].postProcess(result, this);
             } catch (InterruptedException e) {
@@ -198,7 +200,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
             }
         }
         if (firstException != null) {
-            SQLCloseables.closeAllQuietly(dependencies);
+            SQLCloseables.closeAllQuietly(dependencies.values());
             throw firstException;
         }
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 1853c45..781c07e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -21,13 +21,16 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -37,6 +40,7 @@ import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SQLCloseables;
 
 public class LiteralResultIterationPlan extends BaseQueryPlan {
     protected final Iterable<Tuple> tuples;
@@ -71,7 +75,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan)
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, final Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
             private final Iterator<Tuple> tupleIterator = tuples.iterator();
@@ -81,7 +85,8 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
 
             @Override
             public void close() throws SQLException {
-                this.closed = true;;
+                SQLCloseables.closeAll(caches.values());
+                this.closed = true;
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 2990b77..f5b1af0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -24,10 +24,12 @@ import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.RowProjector;
@@ -35,6 +37,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.iterate.ChunkedResultIterator;
 import org.apache.phoenix.iterate.ConcatResultIterator;
@@ -206,7 +209,7 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be too late afterwards
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE);
         ResultIterator scanner;
@@ -229,11 +232,11 @@ public class ScanPlan extends BaseQueryPlan {
                         && isDataToScanWithinThreshold; 
         BaseResultIterators iterators;
         if (isOffsetOnServer) {
-            iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan);
+            iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches);
         } else if (isSerial) {
-            iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan);
+            iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly);
+            iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches);
         }
         estimatedRows = iterators.getEstimatedRowCount();
         estimatedSize = iterators.getEstimatedByteCount();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 98f5d46..a3a9762 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -66,11 +66,14 @@ import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -80,11 +83,13 @@ import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
@@ -133,7 +138,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 	private static final Logger logger = LoggerFactory.getLogger(BaseResultIterators.class);
     private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20;
     private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12");
-
     private final List<List<Scan>> scans;
     private final List<KeyRange> splits;
     private final byte[] physicalTableName;
@@ -148,6 +152,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
     private boolean hasGuidePosts;
     private Scan scan;
     private boolean useStatsForParallelization;
+    protected Map<ImmutableBytesPtr,ServerCache> caches;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() {
         @Override
@@ -464,11 +469,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(),
                 plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset);
         this.plan = plan;
         this.scan = scan;
+        this.caches = caches;
         this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         // Clone MutationState as the one on the connection will change if auto commit is on
@@ -844,7 +850,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         List<PeekingResultIterator> iterators = new ArrayList<PeekingResultIterator>(numScans);
         ScanWrapper previousScan = new ScanWrapper(null);
         return getIterators(scans, services, isLocalIndex, allIterators, iterators, isReverse, maxQueryEndTime,
-                splits.size(), previousScan);
+                splits.size(), previousScan, context.getConnection().getQueryServices().getConfiguration()
+                        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES));
     }
 
     class ScanWrapper {
@@ -866,11 +873,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
 
     private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services,
             boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators,
-            boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan) throws SQLException {
+            boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan, int retryCount) throws SQLException {
         boolean success = false;
         final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize);
         allFutures.add(futures);
         SQLException toThrow = null;
+        final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
             submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper);
@@ -900,24 +908,38 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                     } catch (ExecutionException e) {
                         try { // Rethrow as SQLException
                             throw ServerUtil.parseServerException(e);
-                        } catch (StaleRegionBoundaryCacheException e2) {
+                        } catch (StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e2){
                             // Catch only to try to recover from region boundary cache being out of date
                             if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
                                 services.clearTableRegionCache(physicalTableName);
                                 context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                             }
+                            // Resubmit just this portion of work again
+                            Scan oldScan = scanPair.getFirst();
+                            byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
+                            if(e2 instanceof HashJoinCacheNotFoundException){
+                                logger.debug(
+                                        "Retrying when Hash Join cache is not found on the server ,by sending the cache again");
+                                if(retryCount<=0){
+                                    throw e2;
+                                }
+                                Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId();
+                                if (!hashCacheClient.addHashCacheToServer(startKey,
+                                        caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))), plan.getTableRef().getTable())) { throw e2; }
+                            }
                             concatIterators =
                                     recreateIterators(services, isLocalIndex, allIterators,
                                         iterators, isReverse, maxQueryEndTime, previousScan,
-                                        clearedCache, concatIterators, scanPairItr, scanPair);
+                                        clearedCache, concatIterators, scanPairItr, scanPair, retryCount-1);
                         } catch(ColumnFamilyNotFoundException cfnfe) {
                             if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
                                 Thread.sleep(1000);
                                 concatIterators =
                                         recreateIterators(services, isLocalIndex, allIterators,
                                             iterators, isReverse, maxQueryEndTime, previousScan,
-                                            clearedCache, concatIterators, scanPairItr, scanPair);
+                                            clearedCache, concatIterators, scanPairItr, scanPair, retryCount);
                             }
+                            
                         }
                     }
                 }
@@ -976,7 +998,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
             ScanWrapper previousScan, boolean clearedCache,
             List<PeekingResultIterator> concatIterators,
             Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr,
-            Pair<Scan, Future<PeekingResultIterator>> scanPair) throws SQLException {
+            Pair<Scan, Future<PeekingResultIterator>> scanPair, int retryCount) throws SQLException {
         scanPairItr.remove();
         // Resubmit just this portion of work again
         Scan oldScan = scanPair.getFirst();
@@ -989,20 +1011,21 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         addIterator(iterators, concatIterators);
         concatIterators = Lists.newArrayList();
         getIterators(newNestedScans, services, isLocalIndex, allIterators, iterators, isReverse,
-                maxQueryEndTime, newNestedScans.size(), previousScan);
+                maxQueryEndTime, newNestedScans.size(), previousScan, retryCount);
         return concatIterators;
     }
     
 
     @Override
     public void close() throws SQLException {
-        if (allFutures.isEmpty()) {
-            return;
-        }
+       
         // Don't call cancel on already started work, as it causes the HConnection
         // to get into a funk. Instead, just cancel queued work.
         boolean cancelledWork = false;
         try {
+            if (allFutures.isEmpty()) {
+                return;
+            }
             List<Future<PeekingResultIterator>> futuresToClose = Lists.newArrayListWithExpectedSize(getSplits().size());
             for (List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures : allFutures) {
                 for (List<Pair<Scan,Future<PeekingResultIterator>>> futureScans : futures) {
@@ -1037,6 +1060,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
                 }
             }
         } finally {
+            SQLCloseables.closeAllQuietly(caches.values());
+            caches.clear();
             if (cancelledWork) {
                 context.getConnection().getQueryServices().getExecutor().purge();
             }
@@ -1146,4 +1171,4 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
         return "ResultIterators [name=" + getName() + ",id=" + scanId + ",scans=" + scans + "]";
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 976b839..44c714f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -18,10 +18,13 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 
@@ -30,9 +33,9 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac
        @Override
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         return new TableResultIterator(mutationState, scan, scanMetricsHolder, renewLeaseThreshold,
-                plan, scanGrouper);
+                plan, scanGrouper, caches);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index f0360e2..3c11f4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -22,15 +22,17 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARAL
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.job.JobManager.JobCallable;
-import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
@@ -55,16 +57,16 @@ public class ParallelIterators extends BaseResultIterators {
     private final ParallelIteratorFactory iteratorFactory;
     private final boolean initFirstScanOnly;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        super(plan, perScanLimit, null, scanGrouper, scan);
+        super(plan, perScanLimit, null, scanGrouper, scan,caches);
         this.iteratorFactory = iteratorFactory;
         this.initFirstScanOnly = initFirstScanOnly;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion);
+        this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches);
     }  
 
     @Override
@@ -106,7 +108,7 @@ public class ParallelIterators extends BaseResultIterators {
             final TableResultIterator tableResultItr =
                     context.getConnection().getTableResultIteratorFactory().newIterator(
                         mutationState, tableRef, scan, scanMetricsHolder, renewLeaseThreshold, plan,
-                        scanGrouper);
+                        scanGrouper, caches);
             context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index eb0c949..26d1ed1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -27,8 +28,10 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
@@ -59,9 +62,9 @@ public class SerialIterators extends BaseResultIterators {
     private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
-            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan)
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches)
             throws SQLException {
-        super(plan, perScanLimit, offset, scanGrouper, scan);
+        super(plan, perScanLimit, offset, scanGrouper, scan, caches);
         this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
@@ -90,7 +93,7 @@ public class SerialIterators extends BaseResultIterators {
             Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
-                    PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset);
+                    PeekingResultIterator itr = new SerialIterator(finalScans, tableName, renewLeaseThreshold, offset, caches);
                     return itr;
                 }
 
@@ -140,13 +143,15 @@ public class SerialIterators extends BaseResultIterators {
         private int index;
         private PeekingResultIterator currentIterator;
         private Integer remainingOffset;
+        private Map<ImmutableBytesPtr,ServerCache> caches;
         
-        private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset) throws SQLException {
+        private SerialIterator(List<Scan> flattenedScans, String tableName, long renewLeaseThreshold, Integer offset, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
             this.scans = Lists.newArrayListWithExpectedSize(flattenedScans.size());
             this.tableName = tableName;
             this.renewLeaseThreshold = renewLeaseThreshold;
             this.scans.addAll(flattenedScans);
             this.remainingOffset = offset;
+            this.caches = caches;
             if (this.remainingOffset != null) {
                 // mark the last scan for offset purposes
                 this.scans.get(this.scans.size() - 1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
@@ -180,7 +185,7 @@ public class SerialIterators extends BaseResultIterators {
                             isRequestMetricsEnabled);
                 TableResultIterator itr =
                         new TableResultIterator(mutationState, currentScan, scanMetricsHolder,
-                                renewLeaseThreshold, plan, scanGrouper);
+                                renewLeaseThreshold, plan, scanGrouper, caches);
                 PeekingResultIterator peekingItr = iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
                 Tuple tuple;
                 if ((tuple = peekingItr.peek()) == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 5114acc..e812854 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -30,6 +30,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UN
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -39,9 +40,17 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -50,6 +59,8 @@ import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -70,9 +81,9 @@ public class TableResultIterator implements ResultIterator {
     private final long renewLeaseThreshold;
     private final QueryPlan plan;
     private final ParallelScanGrouper scanGrouper;
+    private static final Logger logger = LoggerFactory.getLogger(TableResultIterator.class);
     private Tuple lastTuple = null;
     private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-
     @GuardedBy("renewLeaseLock")
     private ResultIterator scanIterator;
 
@@ -84,6 +95,10 @@ public class TableResultIterator implements ResultIterator {
     
     private final Lock renewLeaseLock = new ReentrantLock();
 
+    private int retry;
+    private Map<ImmutableBytesPtr,ServerCache> caches;
+    private HashCacheClient hashCacheClient;
+
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetricsHolder = null;
@@ -92,14 +107,21 @@ public class TableResultIterator implements ResultIterator {
         this.scan = null;
         this.plan = null;
         this.scanGrouper = null;
+        this.caches = null;
+        this.retry = 0;
     }
 
     public static enum RenewLeaseStatus {
         RENEWED, NOT_RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, LOCK_NOT_ACQUIRED, NOT_SUPPORTED
     };
-
+    
     public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
             long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        this(mutationState, scan, scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, null);
+    }
+    
+    public TableResultIterator(MutationState mutationState, Scan scan, ScanMetricsHolder scanMetricsHolder,
+            long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper,Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException {
         this.scan = scan;
         this.scanMetricsHolder = scanMetricsHolder;
         this.plan = plan;
@@ -108,6 +130,10 @@ public class TableResultIterator implements ResultIterator {
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
         this.scanGrouper = scanGrouper;
+        this.hashCacheClient = new HashCacheClient(plan.getContext().getConnection());
+        this.caches = caches;
+        this.retry=plan.getContext().getConnection().getQueryServices().getProps()
+        .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
     }
 
     @Override
@@ -145,7 +171,7 @@ public class TableResultIterator implements ResultIterator {
             } catch (SQLException e) {
                 try {
                     throw ServerUtil.parseServerException(e);
-                } catch(StaleRegionBoundaryCacheException e1) {
+                } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e1) {
                     if(ScanUtil.isNonAggregateScan(scan)) {
                         // For non aggregate queries if we get stale region boundary exception we can
                         // continue scanning from the next value of lasted fetched result.
@@ -163,8 +189,28 @@ public class TableResultIterator implements ResultIterator {
                             }
                         }
                         plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                        this.scanIterator =
-                                plan.iterator(scanGrouper, newScan);
+						if (e1 instanceof HashJoinCacheNotFoundException) {
+							logger.debug(
+									"Retrying when Hash Join cache is not found on the server ,by sending the cache again");
+							if (retry <= 0) {
+								throw e1;
+							}
+							retry--;
+							try {
+								Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
+								if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+										caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId))),
+										plan.getTableRef().getTable())) {
+									throw e1;
+								}
+								this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper, newScan);
+
+							} catch (Exception e2) {
+								throw new SQLException(e2);
+							}
+						} else {
+							this.scanIterator = plan.iterator(scanGrouper, newScan);
+						}
                         lastTuple = scanIterator.next();
                     } else {
                         throw e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index c23e342..0b28d5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -18,17 +18,19 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.monitoring.CombinableMetric;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.schema.TableRef;
 
 public interface TableResultIteratorFactory {
     public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef,
             Scan scan, ScanMetricsHolder scanMetricsHolder, long renewLeaseThreshold,
-            QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException;
+            QueryPlan plan, ParallelScanGrouper scanGrouper, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException;
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 32d0469..2ec509c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
@@ -56,6 +57,7 @@ import com.google.common.collect.Lists;
  */
 public class HashCacheClient  {
     private final ServerCacheClient serverCache;
+
     /**
      * Construct client used to create a serialized cached snapshot of a table and send it to each region server
      * for caching during hash join processing.
@@ -81,7 +83,22 @@ public class HashCacheClient  {
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues);
-        return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef);
+        ServerCache cache = serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef, true);
+        return cache;
+    }
+    
+    /**
+     * Should only be used to resend the hash table cache to the regionserver.
+     *  
+     * @param startkeyOfRegion start key of any region hosted on a regionserver which needs hash cache
+     * @param cacheId Id of the cache which needs to be sent
+     * @param pTable
+     * @return
+     * @throws Exception
+     */
+    public boolean addHashCacheToServer(byte[] startkeyOfRegion, ServerCache cache, PTable pTable) throws Exception{
+        if (cache == null) { return false; }
+        return serverCache.addServerCache(startkeyOfRegion, cache, new HashCacheFactory(), ByteUtil.EMPTY_BYTE_ARRAY, pTable);
     }
     
     private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException {
@@ -179,4 +196,5 @@ public class HashCacheClient  {
         // might be coerced later.
         return new RowValueConstructorExpression(values, false);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index e7487a1..7607388 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -399,4 +399,6 @@ public interface QueryConstants {
     public static final byte[] OFFSET_COLUMN = "c_offset".getBytes();
     public static final String LAST_SCAN = "LAST_SCAN";
     public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes();
+    public static final String HASH_JOIN_CACHE_RETRIES = "hashjoin.client.retries.number";
+    public static final int DEFAULT_HASH_JOIN_CACHE_RETRIES = 5;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 44502a5..d11f3a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.List;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -577,4 +578,18 @@ public class ByteUtil {
             throw new IllegalArgumentException("Unknown operator " + op);
         }
     }
+    
+    public static boolean contains(List<byte[]> keys, byte[] key) {
+        for (byte[] k : keys) {
+            if (Arrays.equals(k, key)) { return true; }
+        }
+        return false;
+    }
+
+    public static boolean contains(List<ImmutableBytesPtr> keys, ImmutableBytesPtr key) {
+        for (ImmutableBytesPtr k : keys) {
+            if (key.compareTo(k) == 0) { return true; }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index a08d139..45ef9bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
@@ -45,6 +46,7 @@ public class ServerUtil {
     
     private static final String FORMAT = "ERROR %d (%s): %s";
     private static final Pattern PATTERN = Pattern.compile("ERROR (\\d+) \\((\\w+)\\): (.*)");
+    private static final Pattern HASH_JOIN_EXCEPTION_PATTERN = Pattern.compile("joinId: (-?\\d+)");
     private static final Pattern PATTERN_FOR_TS = Pattern.compile(",serverTimestamp=(\\d+),");
     private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
     private static final Map<Class<? extends Exception>, SQLExceptionCode> errorcodeMap
@@ -127,6 +129,7 @@ public class ServerUtil {
     }
 
     private static SQLException parseRemoteException(Throwable t) {
+        
         String message = t.getLocalizedMessage();
         if (message != null) {
             // If the message matches the standard pattern, recover the SQLException and throw it.
@@ -134,6 +137,10 @@ public class ServerUtil {
             if (matcher.find()) {
                 int statusCode = Integer.parseInt(matcher.group(1));
                 SQLExceptionCode code = SQLExceptionCode.fromErrorCode(statusCode);
+                if(code.equals(SQLExceptionCode.HASH_JOIN_CACHE_NOT_FOUND)){
+                    Matcher m = HASH_JOIN_EXCEPTION_PATTERN.matcher(t.getLocalizedMessage());
+                    if (m.find()) { return new HashJoinCacheNotFoundException(Long.parseLong(m.group(1))); }
+                }
                 return new SQLExceptionInfo.Builder(code).setMessage(matcher.group()).setRootCause(t).build().buildException();
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index fe6d847..d32c443 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -632,6 +632,7 @@ public abstract class BaseTest {
         conf.setInt("hbase.assignment.zkevent.workers", 5);
         conf.setInt("hbase.assignment.threads.max", 5);
         conf.setInt("hbase.catalogjanitor.interval", 5000);
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
         return conf;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index a0696c0..3980bc6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -482,7 +482,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
                 return null;
             }
             
-        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
+        }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4bfb23ae/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 0c05fc0..8261ecd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -74,6 +75,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT.WriteFailingRegionObserver;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ByteBasedLikeExpression;
@@ -937,5 +939,31 @@ public class TestUtil {
         assertTrue(rs.next());
         return rs.getLong(1);
     }
+    
+    public static void addCoprocessor(Connection conn, String tableName, Class coprocessorClass) throws Exception {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+		if (!descriptor.getCoprocessors().contains(coprocessorClass.getName())) {
+			descriptor.addCoprocessor(coprocessorClass.getName(), null, priority, null);
+		}else{
+			return;
+		}
+        int numTries = 10;
+        try (HBaseAdmin admin = services.getAdmin()) {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+            while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
+                    && numTries > 0) {
+                numTries--;
+                if (numTries == 0) {
+                    throw new Exception(
+                            "Check to detect if delaying co-processor was added failed after "
+                                    + numTries + " retries.");
+                }
+                Thread.sleep(1000);
+            }
+        }
+    }
+
 
 }


Mime
View raw message