phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajeshb...@apache.org
Subject phoenix git commit: PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific tables-calcite changes(Rajeshbabu)
Date Fri, 27 Jan 2017 16:24:47 GMT
Repository: phoenix
Updated Branches:
  refs/heads/calcite 405499047 -> 29da79fa6


PHOENIX-3351 Implement TODOs in PhoenixTableModify#upsert to allow writes to tenant specific
tables-calcite changes(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/29da79fa
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/29da79fa
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/29da79fa

Branch: refs/heads/calcite
Commit: 29da79fa62a27547aec1d2083144121a6d6fad05
Parents: 4054990
Author: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Authored: Fri Jan 27 22:06:37 2017 +0530
Committer: Rajeshbabu Chintaguntla <rajeshbabu@apache.org>
Committed: Fri Jan 27 22:06:37 2017 +0530

----------------------------------------------------------------------
 .../phoenix/calcite/rel/PhoenixTableModify.java | 95 +++-----------------
 1 file changed, 11 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/29da79fa/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index f13223f..052015e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -6,6 +6,8 @@ import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +31,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
@@ -112,8 +115,14 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
             final int[] pkSlotIndexes = new int[mappedColumns.size()];
             for (int i = 0; i < columnIndexes.length; i++) {
                 PColumn column = mappedColumns.get(i);
+                int pkColPosition = 0;
                 if (SchemaUtil.isPKColumn(column)) {
-                    pkSlotIndexes[i] = column.getPosition();
+                    for(PColumn col: mappedColumns) {
+                        if(col.equals(column)) break;
+                        // Since first columns in the mappedColumns are pk columns only.
+                        pkColPosition++;
+                    }
+                    pkSlotIndexes[i] = pkColPosition;
                 }
                 columnIndexes[i] = column.getPosition();
             }
@@ -151,55 +160,7 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
                 public MutationState execute() throws SQLException {
                     ResultIterator iterator = queryPlan.iterator();
                     // simplest version, no run-on-server, no pipelined update
-                    StatementContext childContext = queryPlan.getContext();
-                    ConnectionQueryServices services = connection.getQueryServices();
-                    int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
-                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
-                    int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-                    boolean isAutoCommit = connection.getAutoCommit();
-                    byte[][] values = new byte[columnIndexes.length][];
-                    int rowCount = 0;
-                    Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
-                    PTable table = targetTableRef.getTable();
-                    try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext))
{
-                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                        while (rs.next()) {
-                            for (int i = 0; i < values.length; i++) {
-                                PColumn column = table.getColumns().get(columnIndexes[i]);
-                                byte[] bytes = rs.getBytes(i + 1);
-                                ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
-                                Object value = rs.getObject(i + 1);
-                                int rsPrecision = rs.getMetaData().getPrecision(i + 1);
-                                Integer precision = rsPrecision == 0 ? null : rsPrecision;
-                                int rsScale = rs.getMetaData().getScale(i + 1);
-                                Integer scale = rsScale == 0 ? null : rsScale;
-                                // We are guaranteed that the two column will have compatible
types,
-                                // as we checked that before.
-                                if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
SortOrder.getDefault(), precision,
-                                        scale, column.getMaxLength(), column.getScale()))
{ throw new SQLExceptionInfo.Builder(
-                                        SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
-                                        .setMessage("value=" + column.getDataType().toStringLiteral(ptr,
null)).build()
-                                        .buildException(); }
-                                column.getDataType().coerceBytes(ptr, value, column.getDataType(),

-                                        precision, scale, SortOrder.getDefault(), 
-                                        column.getMaxLength(), column.getScale(), column.getSortOrder(),
-                                        table.rowKeyOrderOptimizable());
-                                values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                            }
-                            // TODO onDupKeyBytes
-                            setValues(values, pkSlotIndexes, columnIndexes, table, mutation,
connection, useServerTimestamp, null);
-                            rowCount++;
-                            // Commit a batch if auto commit is true and we're at our batch
size
-                            if (isAutoCommit && rowCount % batchSize == 0) {
-                                MutationState state = new MutationState(targetTableRef, mutation,
0, maxSize, connection);
-                                connection.getMutationState().join(state);
-                                connection.getMutationState().send();
-                                mutation.clear();
-                            }
-                        }
-                        // If auto commit is true, this last batch will be committed upon
return
-                        return new MutationState(targetTableRef, mutation, rowCount / batchSize
* batchSize, maxSize, connection);
-                    }
+                    return UpsertCompiler.upsertSelect(context, targetTableRef, projector,
iterator, columnIndexes, pkSlotIndexes, useServerTimestamp, true);
                 }
 
                 @Override
@@ -216,40 +177,6 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel
{
         }
     }
     
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection,
boolean useServerTimestamp, byte[] onDupKeyBytes) {
-        Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
-        byte[][] pkValues = new byte[table.getPKColumns().size()][];
-        // If the table uses salting, the first byte is the salting byte, set to an empty
array
-        // here and we will fill in the byte later in PRowImpl.
-        if (table.getBucketNum() != null) {
-            pkValues[0] = new byte[] {0};
-        }
-        Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
-        RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-        for (int i = 0; i < values.length; i++) {
-            byte[] value = values[i];
-            PColumn column = table.getColumns().get(columnIndexes[i]);
-            if (SchemaUtil.isPKColumn(column)) {
-                pkValues[pkSlotIndex[i]] = value;
-                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos())
{
-                    if (!useServerTimestamp) {
-                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
-                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
-                        if (rowTimestamp < 0) {
-                            throw new IllegalDataException("Value of a column designated
as ROW_TIMESTAMP cannot be less than zero");
-                        }
-                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
-                    } 
-                }
-            } else {
-                columnValues.put(column, value);
-            }
-        }
-        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-        table.newKey(ptr, pkValues);
-        mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(),
rowTsColInfo, onDupKeyBytes));
-    }
-
     private static MutationPlan delete(final PhoenixConnection connection,
             final PhoenixTable targetTable, final TableRef targetTableRef,
             final QueryPlan queryPlan, final RowProjector projector) {


Mime
View raw message