Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3DAA4200C7E for ; Tue, 9 May 2017 02:42:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C375160BA5; Tue, 9 May 2017 00:42:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E692F160BCA for ; Tue, 9 May 2017 02:42:34 +0200 (CEST) Received: (qmail 72313 invoked by uid 500); 9 May 2017 00:42:34 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 72201 invoked by uid 99); 9 May 2017 00:42:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 00:42:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A466DFF36; Tue, 9 May 2017 00:42:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: apurtell@apache.org To: commits@phoenix.apache.org Date: Tue, 09 May 2017 00:42:36 -0000 Message-Id: <03194a9d92254f1eb02d84f1bbba01a9@git.apache.org> In-Reply-To: <1d4db951c1644f80b9e54642590957b7@git.apache.org> References: <1d4db951c1644f80b9e54642590957b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] phoenix git commit: PHOENIX-3818 Add client setting to disable server UPSERT SELECT work archived-at: Tue, 09 May 2017 00:42:36 -0000 PHOENIX-3818 Add client setting to disable server UPSERT SELECT work Adds phoenix.client.enable.server.upsert.select property that is true (enabled) by default. This acts as a feature toggle for PHOENIX-3271. Signed-off-by: Andrew Purtell Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ed30d1ff Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ed30d1ff Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ed30d1ff Branch: refs/heads/4.x-HBase-0.98 Commit: ed30d1ff151eecbd2161d197c3cf7159f6707e6e Parents: 6befc6c Author: Alex Araujo Authored: Mon May 1 20:27:18 2017 -0500 Committer: Andrew Purtell Committed: Mon May 8 17:35:03 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 93 ++++++++++++++------ .../apache/phoenix/compile/UpsertCompiler.java | 14 ++- .../UngroupedAggregateRegionObserver.java | 14 +-- .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 4 +- .../org/apache/phoenix/util/ExpressionUtil.java | 14 +++ 6 files changed, 97 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 8f95b32..6782c3e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -44,11 +44,12 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -66,14 +67,14 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Map serverProps = Collections.singletonMap(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, TestPhoenixIndexRpcSchedulerFactory.class.getName()); // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue - Map clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - RpcControllerFactory.class.getName()); + Map clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, + RpcControllerFactory.class.getName()); NUM_SLAVES_BASE = 2; setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - @AfterClass - public static void cleanUpAfterTestSuite() throws Exception { + @After + public void cleanUpAfterTest() throws Exception { TestPhoenixIndexRpcSchedulerFactory.reset(); } @@ -90,26 +91,19 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = driver.connect(getUrl(), props); try { - // create the table - conn.createStatement().execute( - "CREATE TABLE " + dataTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + // create the table + createTable(conn, dataTableFullName); - // create the index - conn.createStatement().execute( - "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)"); + // create the index + createIndex(conn, indexName); ensureTablesOnDifferentRegionServers(dataTableFullName, indexTableFullName); - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); - stmt.setString(1, "k1"); - stmt.setString(2, "v1"); - stmt.setString(3, "v2"); - stmt.execute(); - conn.commit(); + upsertRow(conn, dataTableFullName); // run select query that should use the index String selectSql = "SELECT k, v2 from " + dataTableFullName + " WHERE v1=?"; - stmt = conn.prepareStatement(selectSql); + PreparedStatement stmt = conn.prepareStatement(selectSql); stmt.setString(1, "v1"); // verify that the query does a range scan on the index table @@ -126,17 +120,11 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { // drop index table conn.createStatement().execute( "DROP INDEX " + indexName + " ON " + dataTableFullName ); - // create a data table with the same name as the index table - conn.createStatement().execute( - "CREATE TABLE " + indexTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + // create a data table with the same name as the index table + createTable(conn, indexTableFullName); // upsert one row to the table (which has the same table name as the previous index table) - stmt = conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)"); - stmt.setString(1, "k1"); - stmt.setString(2, "v1"); - stmt.setString(3, "v2"); - stmt.execute(); - conn.commit(); + upsertRow(conn, indexTableFullName); // run select query on the new table selectSql = "SELECT k, v2 from " + indexTableFullName + " WHERE v1=?"; @@ -154,8 +142,7 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); TestPhoenixIndexRpcSchedulerFactory.reset(); - conn.createStatement().execute( - "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)"); + createIndex(conn, indexName + "_1"); // verify that that index queue is used and only once (during Upsert Select on server to build the index) Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); } @@ -164,6 +151,54 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { } } + @Test + public void testUpsertSelectServerDisabled() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // disable server side upsert select + props.setProperty(QueryServices.ENABLE_SERVER_UPSERT_SELECT, "false"); + try (Connection conn = driver.connect(getUrl(), props)) { + // create two tables with identical schemas + createTable(conn, dataTableFullName); + upsertRow(conn, dataTableFullName); + String tableName2 = dataTableFullName + "_2"; + createTable(conn, tableName2); + ensureTablesOnDifferentRegionServers(dataTableFullName, tableName2); + // copy the row from the first table using upsert select + upsertSelectRows(conn, dataTableFullName, tableName2); + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), + Mockito.never()).dispatch(Mockito.any(CallRunner.class)); + + } + } + + private void createTable(Connection conn, String tableName) throws SQLException { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + } + + private void createIndex(Connection conn, String indexName) throws SQLException { + conn.createStatement().execute( + "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)"); + } + + private void upsertRow(Connection conn, String tableName) throws SQLException { + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)"); + stmt.setString(1, "k1"); + stmt.setString(2, "v1"); + stmt.setString(3, "v2"); + stmt.execute(); + conn.commit(); + } + + private void upsertSelectRows(Connection conn, String tableName1, String tableName2) throws SQLException { + PreparedStatement stmt = + conn.prepareStatement( + "UPSERT INTO " + tableName2 + " (k, v1, v2) SELECT k, v1, v2 FROM " + + tableName1); + stmt.execute(); + conn.commit(); + } + /** * Verifies that the given tables each have a single region and are on * different region servers. If they are on the same server moves tableName2 http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 2304d83..931513a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -105,6 +105,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -335,6 +336,9 @@ public class UpsertCompiler { int nValuesToSet; boolean sameTable = false; boolean runOnServer = false; + boolean serverUpsertSelectEnabled = + services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT, + QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT); UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null; // Retry once if auto commit is off, as the meta data may // be out of date. We do not retry if auto commit is on, as we @@ -505,7 +509,7 @@ public class UpsertCompiler { && tableRefToBe.equals(selectResolver.getTables().get(0)); tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); /* We can run the upsert in a coprocessor if: - * 1) from has only 1 table + * 1) from has only 1 table or server UPSERT SELECT is enabled * 2) the select query isn't doing aggregation (which requires a client-side final merge) * 3) autoCommit is on * 4) the table is not immutable with indexes, as the client is the one that figures out the additional @@ -523,7 +527,7 @@ public class UpsertCompiler { // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query, // so we might be able to run it entirely on the server side. // region space managed by region servers. So we bail out on executing on server side. - runOnServer = isAutoCommit && !table.isTransactional() + runOnServer = (sameTable || serverUpsertSelectEnabled) && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && table.getRowTimestampColPos() == -1; } @@ -666,7 +670,11 @@ public class UpsertCompiler { reverseColumnIndexes[tempPos] = pos; reverseColumnIndexes[i] = i; } - + // If any pk slots are changing and server side UPSERT SELECT is disabled, do not run on server + if (!serverUpsertSelectEnabled && ExpressionUtil + .isPkPositionChanging(new TableRef(table), projectedExpressions)) { + runOnServer = false; + } //////////////////////////////////////////////////////////////////// // UPSERT SELECT run server-side ///////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 92bde94..23b8be0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -112,6 +112,7 @@ import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; @@ -397,7 +398,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver values = new byte[projectedTable.getPKColumns().size()][]; areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(), region.getTableDesc().getTableName().getName()) == 0 - && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions); + && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); } else { byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); @@ -791,17 +792,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private boolean isPkPositionChanging(TableRef tableRef, List projectedExpressions) throws SQLException { - // If the row ends up living in a different region, we'll get an error otherwise. - for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) { - PColumn column = tableRef.getTable().getPKColumns().get(i); - Expression source = projectedExpressions.get(i); - if (source == null || !source - .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; } - } - return false; - } - private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) { return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize) || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 6e75370..2627207 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -246,6 +246,9 @@ public interface QueryServices extends SQLCloseable { public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme"; public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme"; + // whether to enable server side RS -> RS calls for upsert select statements + public static final String ENABLE_SERVER_UPSERT_SELECT ="phoenix.client.enable.server.upsert.select"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0c4ebc0..eef964f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -287,7 +287,9 @@ public class QueryServicesOptions { // 4.10, psql and CSVBulkLoad // expects binary data to be base 64 // encoded - + // RS -> RS calls for upsert select statements are enabled by default + public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = true; + private final Configuration config; private QueryServicesOptions(Configuration config) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java index 1fbb534..fbd10fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java @@ -10,11 +10,15 @@ package org.apache.phoenix.util; import java.sql.SQLException; +import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; @@ -54,4 +58,14 @@ public class ExpressionUtil { return false; } + public static boolean isPkPositionChanging(TableRef tableRef, List projectedExpressions) throws SQLException { + for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) { + PColumn column = tableRef.getTable().getPKColumns().get(i); + Expression source = projectedExpressions.get(i); + if (source == null || !source + .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; } + } + return false; + } + }