phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [4/9] phoenix git commit: PHOENIX-4849 Phoenix may generate incorrectly replace TableResultIterators after HBase region splits.
Date Tue, 02 Oct 2018 15:31:31 GMT
PHOENIX-4849 Phoenix may generate incorrectly replace TableResultIterators after HBase region
splits.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a9e886cc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a9e886cc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a9e886cc

Branch: refs/heads/omid2
Commit: a9e886ccfd11cbd30a9c9ea3830eb28526bc6f76
Parents: ec54bef
Author: Lars Hofhansl <larsh@apache.org>
Authored: Wed Sep 26 11:18:32 2018 -0700
Committer: Lars Hofhansl <larsh@apache.org>
Committed: Wed Sep 26 11:18:32 2018 -0700

----------------------------------------------------------------------
 .../end2end/UpsertSelectAutoCommitIT.java       | 22 ++++++---
 .../apache/phoenix/compile/UpsertCompiler.java  | 17 +++++++
 .../phoenix/coprocessor/ScanRegionObserver.java |  9 +++-
 .../phoenix/iterate/TableResultIterator.java    | 48 +++++++-------------
 .../java/org/apache/phoenix/util/ScanUtil.java  |  8 ++++
 5 files changed, 63 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9e886cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 3966f15..6210852 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -34,10 +34,13 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 
@@ -160,19 +163,24 @@ public class UpsertSelectAutoCommitIT extends ParallelStatsDisabledIT
{
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
-        conn.createStatement().execute("CREATE SEQUENCE keys");
+        conn.createStatement().execute("CREATE SEQUENCE keys CACHE 1000");
         String tableName = generateUniqueName();
-        conn.createStatement().execute(
-            "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)");
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + " (pk INTEGER PRIMARY KEY, val INTEGER) UPDATE_CACHE_FREQUENCY=3600000");
 
         conn.createStatement().execute(
             "UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR keys,1)");
-        for (int i=0; i<6; i++) {
-            Statement stmt = conn.createStatement();
-            int upsertCount = stmt.executeUpdate(
-                "UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR keys, val FROM " + tableName);
+        PreparedStatement stmt =
+                conn.prepareStatement("UPSERT INTO " + tableName
+                        + " SELECT NEXT VALUE FOR keys, val FROM " + tableName);
+        HBaseAdmin admin =
+                driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+        for (int i=0; i<12; i++) {
+            admin.split(TableName.valueOf(tableName));
+            int upsertCount = stmt.executeUpdate();
             assertEquals((int)Math.pow(2, i), upsertCount);
         }
+        admin.close();
         conn.close();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9e886cc/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 6d81f8b..4f08846 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -76,12 +76,14 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.UpsertStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
@@ -557,6 +559,21 @@ public class UpsertCompiler {
             // Use optimizer to choose the best plan
             QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver,
targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), true, false, null);
             queryPlanToBe = compiler.compile();
+
+            if (sameTable) {
+                // in the UPSERT INTO X ... SELECT FROM X case enforce the source tableRef's
TS
+                // as max TS, so that the query can safely restarted and still work of a
snapshot
+                // (so it won't see its own data in case of concurrent splits)
+                // see PHOENIX-4849
+                long serverTime = selectResolver.getTables().get(0).getCurrentTime();
+                if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
+                    // if this is the first time this table is resolved the ref's current
time might not be defined, yet
+                    // in that case force an RPC to get the server time
+                    serverTime = new MetaDataClient(connection).getCurrentTime(schemaName,
tableName);
+                }
+                Scan scan = queryPlanToBe.getContext().getScan();
+                ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), serverTime);
+            }
             // This is post-fix: if the tableRef is a projected table, this means there are
post-processing
             // steps and parallelIteratorFactory did not take effect.
             if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED
|| queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9e886cc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 2d9cd4f..c2dfc4c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ScanUtil;
 
 /**
  *
@@ -75,8 +76,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
     }
 
     @Override
-    protected boolean isRegionObserverFor(Scan scan) {
-        return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    protected boolean skipRegionBoundaryCheck(Scan scan) {
+        return super.skipRegionBoundaryCheck(scan) || ScanUtil.isSimpleScan(scan);
     }
 
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return ScanUtil.isNonAggregateScan(scan);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9e886cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index f1d1663..e6b94fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -170,7 +170,7 @@ public class TableResultIterator implements ResultIterator {
             } catch (SQLException e) {
                 try {
                     throw ServerUtil.parseServerException(e);
-                } catch(StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException
e1) {
+                } catch(HashJoinCacheNotFoundException e1) {
                     if(ScanUtil.isNonAggregateScan(scan) && plan.getContext().getAggregationManager().isEmpty())
{
                         // For non aggregate queries if we get stale region boundary exception
we can
                         // continue scanning from the next value of lasted fetched result.
@@ -188,40 +188,24 @@ public class TableResultIterator implements ResultIterator {
                             }
                         }
                         plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                        if (e1 instanceof HashJoinCacheNotFoundException) {
-                            logger.debug(
-                                    "Retrying when Hash Join cache is not found on the server
,by sending the cache again");
-                            if (retry <= 0) {
+                        logger.debug(
+                                "Retrying when Hash Join cache is not found on the server
,by sending the cache again");
+                        if (retry <= 0) {
+                            throw e1;
+                        }
+                        Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
+                        retry--;
+                        try {
+                            ServerCache cache = caches == null ? null :
+                                    caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                            if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+                                    cache, plan.getTableRef().getTable())) {
                                 throw e1;
                             }
-                            Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId();
-                            retry--;
-                            try {
-                                ServerCache cache = caches == null ? null :
-                                        caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
-                                if (!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
-                                        cache, plan.getTableRef().getTable())) {
-                                    throw e1;
-                                }
-                                this.scanIterator = ((BaseQueryPlan) plan).iterator(caches,
scanGrouper, newScan);
+                            this.scanIterator = ((BaseQueryPlan) plan).iterator(caches, scanGrouper,
newScan);
 
-                            } catch (Exception ex) {
-                                throw ServerUtil.parseServerException(ex);
-                            }
-                        } else {
-                            try {
-                                if(plan.getContext().isClientSideUpsertSelect()) {
-                                    if(ScanUtil.isLocalIndex(newScan)) {
-                                        throw e;
-                                    }
-                                    this.scanIterator =
-                                            new ScanningResultIterator(htable.getScanner(newScan),
newScan, scanMetricsHolder);
-                                } else {
-                                    this.scanIterator = plan.iterator(scanGrouper, newScan);
-                                }
-                            } catch (IOException ex) {
-                                throw ServerUtil.parseServerException(ex);
-                            }
+                        } catch (Exception ex) {
+                            throw ServerUtil.parseServerException(ex);
                         }
                         lastTuple = scanIterator.next();
                     } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9e886cc/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 62ecebd..2ac08e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -119,6 +119,14 @@ public class ScanUtil {
         return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
     }
 
+    // Designates a "simple scan", i.e. a scan that does not need to be scoped
+    // to a single region.
+    public static boolean isSimpleScan(Scan scan) {
+        return  ScanUtil.isNonAggregateScan(scan) &&
+                scan.getAttribute(BaseScannerRegionObserver.TOPN) == null &&
+                scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) == null;
+    }
+
     // Use getTenantId and pass in column name to match against
     // in as PSchema attribute. If column name matches in 
     // KeyExpressions, set on scan as attribute


Mime
View raw message