phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject phoenix git commit: PHOENIX-3612 Make tracking of max allowed number of mutations bytes based instead of row based
Date Fri, 02 Jun 2017 21:19:04 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 a7f9607b8 -> 774a31e3d


PHOENIX-3612 Make tracking of max allowed number of mutations bytes based instead of row based


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/774a31e3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/774a31e3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/774a31e3

Branch: refs/heads/4.x-HBase-0.98
Commit: 774a31e3da4e517c11f047495dcc2f71259c7848
Parents: a7f9607
Author: Thomas <tdsilva@salesforce.com>
Authored: Fri Jun 2 10:52:39 2017 -0700
Committer: Thomas <tdsilva@salesforce.com>
Committed: Fri Jun 2 11:37:25 2017 -0700

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/QueryMoreIT.java | 46 +++++++++++++-
 .../apache/phoenix/execute/PartialCommitIT.java |  4 +-
 .../apache/phoenix/compile/DeleteCompiler.java  | 16 ++---
 .../apache/phoenix/compile/PostDDLCompiler.java |  4 +-
 .../compile/PostLocalIndexDDLCompiler.java      |  2 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 14 +++--
 .../phoenix/exception/SQLExceptionCode.java     |  5 +-
 .../apache/phoenix/execute/MutationState.java   | 59 +++++++++++-------
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  9 +--
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  6 +-
 .../query/ConnectionlessQueryServicesImpl.java  |  2 +-
 .../org/apache/phoenix/query/QueryServices.java |  1 +
 .../phoenix/query/QueryServicesOptions.java     |  1 +
 .../apache/phoenix/schema/MetaDataClient.java   | 50 +++++++--------
 .../schema/types/PArrayDataTypeEncoder.java     | 65 ++++++++++++++++++++
 .../org/apache/phoenix/util/KeyValueUtil.java   | 58 +++++++++++++++++
 .../apache/phoenix/jdbc/PhoenixDriverTest.java  |  6 +-
 17 files changed, 270 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index bfccb63..8397e4d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -38,6 +39,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -479,7 +481,7 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
     public void testMutationBatch() throws Exception {
         Properties connectionProperties = new Properties();
         connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "10");
-        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024");
+        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
         PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
         String fullTableName = generateUniqueName();
         try (Statement stmt = connection.createStatement()) {
@@ -500,13 +502,53 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         
         // set the batch size (rows) to 1 
         connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");
-        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "1024");
+        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
         connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
         upsertRows(connection, fullTableName);
         connection.commit();
         // each row should be in its own batch
         assertEquals(4L, connection.getMutationState().getBatchCount());
     }
+    
+    @Test
+    public void testMaxMutationSize() throws Exception {
+        Properties connectionProperties = new Properties();
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
+        PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        String fullTableName = generateUniqueName();
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
+                "    ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
+                "    SCORE DOUBLE NOT NULL,\n" +
+                "    ENTITY_ID CHAR(15) NOT NULL\n" +
+                "    CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
+                "        ORGANIZATION_ID,\n" +
+                "        SCORE DESC,\n" +
+                "        ENTITY_ID DESC\n" +
+                "    )\n" +
+                ") MULTI_TENANT=TRUE");
+        }
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        }
+        catch(SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode());
+        }
+        
+        // set the max mutation size (bytes) to a low value
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
+        connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
+        connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
+        try {
+            upsertRows(connection, fullTableName);
+            fail();
+        }
+        catch(SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode());
+        }
+    }
 
     private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
         PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index a5555f3..cd0c371 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -270,8 +270,8 @@ public class PartialCommitIT extends BaseOwnClusterIT {
         // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
         return new PhoenixConnection(phxCon, null) {
             @Override
-            protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations, null, null);
+            protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
+                return new MutationState(maxSize, maxSizeBytes, this, mutations, null, null);
             };
         };
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 71dc76a..de8b2ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -116,6 +116,7 @@ public class DeleteCompiler {
         final boolean isAutoCommit = connection.getAutoCommit();
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
         Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
         List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
@@ -174,10 +175,10 @@ public class DeleteCompiler {
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
-                    MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection);
+                    MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
                     for (int i = 0; i < indexTableRefs.size(); i++) {
-                        MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection);
+                        MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
                         connection.getMutationState().join(indexState);
                     }
                     connection.getMutationState().send();
@@ -190,10 +191,10 @@ public class DeleteCompiler {
 
             // If auto commit is true, this last batch will be committed upon return
             int nCommittedRows = isAutoCommit ? (rowCount / batchSize * batchSize) : 0;
-            MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection);
+            MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
             for (int i = 0; i < indexTableRefs.size(); i++) {
                 // To prevent the counting of these index rows, we have a negative for remainingRows.
-                MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, connection);
+                MutationState indexState = new MutationState(indexTableRefs.get(i), indexMutations.get(i), 0, maxSize, maxSizeBytes, connection);
                 state.join(indexState);
             }
             return state;
@@ -496,6 +497,7 @@ public class DeleteCompiler {
             }
             
             final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+            final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
      
             final StatementContext context = plan.getContext();
             // If we're doing a query for a set of rows with no where clause, then we don't need to contact the server at all.
@@ -522,7 +524,7 @@ public class DeleteCompiler {
                         while (iterator.hasNext()) {
                             mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                         }
-                        return new MutationState(tableRef, mutation, 0, maxSize, connection);
+                        return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
                     }
     
                     @Override
@@ -627,7 +629,7 @@ public class DeleteCompiler {
                             try {
                                 Tuple row = iterator.next();
                                 final long mutationCount = (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                                return new MutationState(maxSize, connection) {
+                                return new MutationState(maxSize, maxSizeBytes, connection) {
                                     @Override
                                     public long getUpdateCount() {
                                         return mutationCount;
@@ -716,7 +718,7 @@ public class DeleteCompiler {
                                 }
                                 // Return total number of rows that have been delete. In the case of auto commit being off
                                 // the mutations will all be in the mutation state of the current connection.
-                                MutationState state = new MutationState(maxSize, connection, totalRowCount);
+                                MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount);
 
                                 // set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
                                 state.setReadMetricQueue(plan.getContext().getReadMetricsQueue());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 0b3de6e..e5ed6a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -140,7 +140,7 @@ public class PostDDLCompiler {
             @Override
             public MutationState execute() throws SQLException {
                 if (tableRefs.isEmpty()) {
-                    return new MutationState(0, connection);
+                    return new MutationState(0, 1000, connection);
                 }
                 boolean wasAutoCommit = connection.getAutoCommit();
                 try {
@@ -319,7 +319,7 @@ public class PostDDLCompiler {
                         
                     }
                     final long count = totalMutationCount;
-                    return new MutationState(1, connection) {
+                    return new MutationState(1, 1000, connection) {
                         @Override
                         public long getUpdateCount() {
                             return count;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index 7e3c3b2..f34c5a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -108,7 +108,7 @@ public class PostLocalIndexDDLCompiler {
                     // The contract is to return a MutationState that contains the number of rows modified. In this
                     // case, it's the number of rows in the data table which corresponds to the number of index
                     // rows that were added.
-                    return new MutationState(0, connection, rowCount);
+                    return new MutationState(0, 0, connection, rowCount);
                 }
 
             };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 69dab66..ca15e4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -180,6 +180,9 @@ public class UpsertCompiler {
         ConnectionQueryServices services = connection.getQueryServices();
         int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
                 QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        int maxSizeBytes =
+                services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
         boolean isAutoCommit = connection.getAutoCommit();
         int numSplColumns =
@@ -240,14 +243,14 @@ public class UpsertCompiler {
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
-                    MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
+                    MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
                     connection.getMutationState().join(state);
                     connection.getMutationState().send();
                     mutation.clear();
                 }
             }
             // If auto commit is true, this last batch will be committed upon return
-            return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
+            return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, maxSizeBytes, connection);
         }
     }
 
@@ -316,6 +319,7 @@ public class UpsertCompiler {
         final PhoenixConnection connection = statement.getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         List<ColumnName> columnNodes = upsert.getColumns();
         TableRef tableRefToBe = null;
         PTable table = null;
@@ -751,7 +755,7 @@ public class UpsertCompiler {
                                     new MetaDataClient(connection).buildIndex(index, tableRef,
                                             scan.getTimeRange().getMax(), scan.getTimeRange().getMax() + 1);
                                 }
-                                return new MutationState(maxSize, connection) {
+                                return new MutationState(maxSize, maxSizeBytes, connection) {
                                     @Override
                                     public long getUpdateCount() {
                                         return mutationCount;
@@ -844,7 +848,7 @@ public class UpsertCompiler {
                         }
                         // Return total number of rows that have been updated. In the case of auto commit being off
                         // the mutations will all be in the mutation state of the current connection.
-                        MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+                        MutationState mutationState = new MutationState(maxSize, maxSizeBytes, statement.getConnection(), totalRowCount);
                         /*
                          *  All the metrics collected for measuring the reads done by the parallel mutating iterators
                          *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
@@ -1085,7 +1089,7 @@ public class UpsertCompiler {
                     viewConstants = IndexUtil.getViewConstants(parentTable);
                 }
                 setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0);
-                return new MutationState(tableRef, mutation, 0, maxSize, connection);
+                return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index ecbb285..841fd5d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -432,7 +432,10 @@ public enum SQLExceptionCode {
                                     727, "43M11", " ASYNC option is not allowed.. "),
     NEW_CONNECTION_THROTTLED(728, "410M1", "Could not create connection " +
         "because this client already has the maximum number" +
-        " of connections to the target cluster.");
+        " of connections to the target cluster."),
+    
+    MAX_MUTATION_SIZE_EXCEEDED(729, "LIM01", "MutationState size is bigger than maximum allowed number of rows"),
+    MAX_MUTATION_SIZE_BYTES_EXCEEDED(730, "LIM02", "MutationState size is bigger than maximum allowed number of bytes");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 9d2770d..87767cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -128,6 +128,7 @@ public class MutationState implements SQLCloseable {
     
     private final PhoenixConnection connection;
     private final long maxSize;
+    private final long maxSizeBytes;
     private final long batchSize;
     private final long batchSizeBytes;
     private long batchCount = 0L;
@@ -146,35 +147,36 @@ public class MutationState implements SQLCloseable {
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
 
-    public MutationState(long maxSize, PhoenixConnection connection) {
-        this(maxSize,connection, null, null);
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
+        this(maxSize, maxSizeBytes, connection, null, null);
     }
     
-    public MutationState(long maxSize, PhoenixConnection connection, TransactionContext txContext) {
-        this(maxSize,connection, null, txContext);
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, TransactionContext txContext) {
+        this(maxSize, maxSizeBytes, connection, null, txContext);
     }
     
     public MutationState(MutationState mutationState) {
-        this(mutationState.maxSize, mutationState.connection, mutationState.getTransaction(), null);
+        this(mutationState.maxSize, mutationState.maxSizeBytes, mutationState.connection, mutationState.getTransaction(), null);
     }
     
-    public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
-        this(maxSize, connection, null, null, sizeOffset);
+    public MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, long sizeOffset) {
+        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext) {
-        this(maxSize,connection, tx, txContext, 0);
+    private MutationState(long maxSize, long maxSizeBytes,PhoenixConnection connection, Transaction tx, TransactionContext txContext) {
+        this(maxSize, maxSizeBytes, connection, tx, txContext, 0);
     }
     
-    private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
-        this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext);
+    private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) {
+        this(maxSize, maxSizeBytes, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext);
         this.sizeOffset = sizeOffset;
     }
     
-    MutationState(long maxSize, PhoenixConnection connection,
-            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
-            Transaction tx, TransactionContext txContext) {
+    MutationState(long maxSize, long maxSizeBytes,
+            PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations, Transaction tx, TransactionContext txContext) {
         this.maxSize = maxSize;
+        this.maxSizeBytes = maxSizeBytes;
         this.connection = connection;
         this.batchSize = connection.getMutateBatchSize();
         this.batchSizeBytes = connection.getMutateBatchSizeBytes();
@@ -201,8 +203,8 @@ public class MutationState implements SQLCloseable {
         }
     }
 
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, null, null, sizeOffset);
+    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException {
+        this(maxSize, maxSizeBytes, connection, null, null, sizeOffset);
         this.mutations.put(table, mutations);
         this.numRows = mutations.size();
         this.tx = connection.getMutationState().getTransaction();
@@ -213,6 +215,10 @@ public class MutationState implements SQLCloseable {
         return maxSize;
     }
     
+    public long getMaxSizeBytes() {
+        return maxSizeBytes;
+    }
+    
     /**
      * Commit a write fence when creating an index so that we can detect
      * when a data table transaction is started before the create index
@@ -436,16 +442,23 @@ public class MutationState implements SQLCloseable {
         return false;
     }
 
-    public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) {
-        MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
+    public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) {
+        MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap(), null, null);
         state.sizeOffset = 0;
         return state;
     }
     
-    private void throwIfTooBig() {
+    private void throwIfTooBig() throws SQLException {
         if (numRows > maxSize) {
-            // TODO: throw SQLException ?
-            throw new IllegalArgumentException("MutationState size of " + numRows + " is bigger than max allowed size of " + maxSize);
+            resetState();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
+                    .buildException();
+        }
+        long estimatedSize = KeyValueUtil.getEstimatedRowSize(mutations);
+        if (estimatedSize > maxSizeBytes) {
+            resetState();
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
+                    .build().buildException();
         }
     }
     
@@ -512,7 +525,7 @@ public class MutationState implements SQLCloseable {
      * 
      * @param newMutationState the newer mutation state
      */
-    public void join(MutationState newMutationState) {
+    public void join(MutationState newMutationState) throws SQLException {
         if (this == newMutationState) { // Doesn't make sense
             return;
         }
@@ -1177,7 +1190,7 @@ public class MutationState implements SQLCloseable {
         List<Mutation> currentList = Lists.newArrayList();
         long currentBatchSizeBytes = 0L;
         for (Mutation mutation : allMutationList) {
-            long mutationSizeBytes = mutation.heapSize();
+            long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
             if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) {
                 if (currentList.size() > 0) {
                     mutationBatchList.add(currentList);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 5af418d..09796aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -264,6 +264,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
         timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT);
         String numberPattern = this.services.getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
         int maxSize = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+        int maxSizeBytes = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
         Format dateFormat = DateUtil.getDateFormatter(datePattern);
         Format timeFormat = DateUtil.getDateFormatter(timePattern);
         Format timestampFormat = DateUtil.getDateFormatter(timestampPattern);
@@ -294,7 +295,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
             }
         };
         this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps());
-        this.mutationState = mutationState == null ? newMutationState(maxSize) : new MutationState(mutationState);
+        this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState);
         this.metaData = metaData;
         this.metaData.pruneTables(pruner);
         this.metaData.pruneFunctions(pruner);
@@ -480,8 +481,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
     	return metaData.getTableRef(key);
     }
 
-    protected MutationState newMutationState(int maxSize) {
-        return new MutationState(maxSize, this);
+    protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
+        return new MutationState(maxSize, maxSizeBytes, this);
     }
     
     public MutationState getMutationState() {
@@ -657,7 +658,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea
             .build().buildException();
         }
         this.mutationState.rollback();
-        this.mutationState = new MutationState(this.mutationState.getMaxSize(), this, txContext);
+        this.mutationState = new MutationState(this.mutationState.getMaxSize(), this.mutationState.getMaxSizeBytes(), this, txContext);
         
         // Write data to HBase after each statement execution as the commit may not
         // come through Phoenix APIs.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 8b00113..eadb108 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -873,7 +873,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                     } catch(IOException e) {
                         throw new SQLException(e);
                     }
-                    return new MutationState(0, context.getConnection());
+                    return new MutationState(0, 0, context.getConnection());
                 }
             };
             
@@ -977,7 +977,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                     } catch(IOException e) {
                         throw new SQLException(e);
                     }
-                    return new MutationState(0, context.getConnection());
+                    return new MutationState(0, 0, context.getConnection());
                 }
             };
             
@@ -1258,7 +1258,7 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                     PhoenixConnection phxConn = stmt.getConnection();
                     Properties props = new Properties();
                     phxConn.getQueryServices().upgradeSystemTables(phxConn.getURL(), props);
-                    return MutationState.emptyMutationState(-1, phxConn);
+                    return MutationState.emptyMutationState(-1, -1, phxConn);
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 76b69fb..9d6712a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -352,7 +352,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
 
     @Override
     public MutationState updateData(MutationPlan plan) throws SQLException {
-        return new MutationState(0, plan.getContext().getConnection());
+        return new MutationState(0, 0, plan.getContext().getConnection());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 4b871d5..7c37930 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -85,6 +85,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String CALL_QUEUE_ROUND_ROBIN_ATTRIB = "ipc.server.callqueue.roundrobin";
     public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching";
     public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
+    public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = "phoenix.mutate.maxSizeBytes";
 
     public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize";
     public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index c01e454..b8e92a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -126,6 +126,7 @@ public class QueryServicesOptions {
     public static final String DEFAULT_DATE_FORMAT_TIMEZONE = DateUtil.DEFAULT_TIME_ZONE_ID;
     public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true;
     public static final int DEFAULT_MAX_MUTATION_SIZE = 500000;
+    public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES =  104857600; // 100 Mb
     public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes
     public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated
     public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 32597a1..d76f2c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1041,7 +1041,7 @@ public class MetaDataClient {
         table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, false, null, null, tableProps, commonFamilyProps);
 
         if (table == null || table.getType() == PTableType.VIEW /*|| table.isTransactional()*/) {
-            return new MutationState(0,connection);
+            return new MutationState(0, 0, connection);
         }
         // Hack to get around the case when an SCN is specified on the connection.
         // In this case, we won't see the table we just created yet, so we hack
@@ -1115,7 +1115,7 @@ public class MetaDataClient {
             }
         }
         final long count = rowCount;
-        return new MutationState(1, connection) {
+        return new MutationState(1, 1000, connection) {
             @Override
             public long getUpdateCount() {
                 return count;
@@ -1374,17 +1374,17 @@ public class MetaDataClient {
 
     public MutationState declareCursor(DeclareCursorStatement statement, QueryPlan queryPlan) throws SQLException {
         CursorUtil.declareCursor(statement, queryPlan);
-        return new MutationState(0,connection);
+        return new MutationState(0, 0, connection);
     }
 
     public MutationState open(OpenStatement statement) throws SQLException {
         CursorUtil.openCursor(statement, connection);
-        return new MutationState(0,connection);
+        return new MutationState(0, 0, connection);
     }
 
     public MutationState close(CloseStatement statement) throws SQLException {
         CursorUtil.closeCursor(statement);
-        return new MutationState(0,connection);
+        return new MutationState(0, 0, connection);
     }
 
     /**
@@ -1596,7 +1596,7 @@ public class MetaDataClient {
             }
         }
         if (table == null) {
-            return new MutationState(0,connection);
+            return new MutationState(0, 0, connection);
         }
 
         if (logger.isInfoEnabled()) logger.info("Created index " + table.getName().getString() + " at " + table.getTimeStamp());
@@ -1605,7 +1605,7 @@ public class MetaDataClient {
                 QueryServicesOptions.DEFAULT_INDEX_ASYNC_BUILD_ENABLED);
         // In async process, we return immediately as the MR job needs to be triggered .
         if(statement.isAsync() && asyncIndexBuildEnabled) {
-            return new MutationState(0, connection);
+            return new MutationState(0, 0, connection);
         }
 
         // If our connection is at a fixed point-in-time, we need to open a new
@@ -1627,11 +1627,11 @@ public class MetaDataClient {
             connection.getQueryServices().dropSequence(tenantId, schemaName, sequenceName, timestamp);
         } catch (SequenceNotFoundException e) {
             if (statement.ifExists()) {
-                return new MutationState(0, connection);
+                return new MutationState(0, 0, connection);
             }
             throw e;
         }
-        return new MutationState(1, connection);
+        return new MutationState(1, 1000, connection);
     }
 
     public MutationState createSequence(CreateSequenceStatement statement, long startWith,
@@ -1662,11 +1662,11 @@ public class MetaDataClient {
                     startWith, incrementBy, cacheSize, minValue, maxValue, cycle, timestamp);
         } catch (SequenceAlreadyExistsException e) {
             if (ifNotExists) {
-                return new MutationState(0, connection);
+                return new MutationState(0, 0, connection);
             }
             throw e;
         }
-        return new MutationState(1, connection);
+        return new MutationState(1, 1000, connection);
     }
 
     public MutationState createFunction(CreateFunctionStatement stmt) throws SQLException {
@@ -1728,7 +1728,7 @@ public class MetaDataClient {
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
-        return new MutationState(1, connection);
+        return new MutationState(1, 1000, connection);
     }
 
     private static ColumnDef findColumnDefOrNull(List<ColumnDef> colDefs, ColumnName colName) {
@@ -2787,7 +2787,7 @@ public class MetaDataClient {
                 PFunction function = connection.getMetaDataCache().getFunction(new PTableKey(tenantId, functionName));
                 if (function.isTemporaryFunction()) {
                     connection.removeFunction(tenantId, functionName, clientTimeStamp);
-                    return new MutationState(0, connection);
+                    return new MutationState(0, 0, connection);
                 }
             } catch(FunctionNotFoundException e) {
 
@@ -2807,7 +2807,7 @@ public class MetaDataClient {
                 connection.removeFunction(tenantId, functionName, result.getMutationTime());
                 break;
             }
-            return new MutationState(0, connection);
+            return new MutationState(0, 0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
@@ -2905,7 +2905,7 @@ public class MetaDataClient {
                 }
                 break;
             }
-            return new MutationState(0, connection);
+            return new MutationState(0, 0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
@@ -3557,7 +3557,7 @@ public class MetaDataClient {
                         if (!ifNotExists) {
                             throw new ColumnAlreadyExistsException(schemaName, tableName, SchemaUtil.findExistingColumn(result.getTable(), columns));
                         }
-                        return new MutationState(0,connection);
+                        return new MutationState(0, 0, connection);
                     }
                     // Only update client side cache if we aren't adding a PK column to a table with indexes or
                     // transitioning a table from non transactional to transactional.
@@ -3606,7 +3606,7 @@ public class MetaDataClient {
                         MutationPlan plan = new PostDDLCompiler(connection).compile(Collections.singletonList(new TableRef(null, table, ts, false)), emptyCF, projectCF == null ? null : Collections.singletonList(projectCF), null, ts);
                         return connection.getQueryServices().updateData(plan);
                     }
-                    return new MutationState(0,connection);
+                    return new MutationState(0, 0, connection);
                 } catch (ConcurrentTableMutationException e) {
                     if (retried) {
                         throw e;
@@ -3721,7 +3721,7 @@ public class MetaDataClient {
                         columnRef = resolver.resolveColumn(null, column.getFamilyName(), column.getColumnName());
                     } catch (ColumnNotFoundException e) {
                         if (statement.ifExists()) {
-                            return new MutationState(0,connection);
+                            return new MutationState(0, 0, connection);
                         }
                         throw e;
                     }
@@ -3825,7 +3825,7 @@ public class MetaDataClient {
                         if (!statement.ifExists()) {
                             throw new ColumnNotFoundException(schemaName, tableName, Bytes.toString(result.getFamilyName()), Bytes.toString(result.getColumnName()));
                         }
-                        return new MutationState(0, connection);
+                        return new MutationState(0, 0, connection);
                     }
                     // If we've done any index metadata updates, don't bother trying to update
                     // client-side cache as it would be too painful. Just let it pull it over from
@@ -3918,7 +3918,7 @@ public class MetaDataClient {
                         // Return the last MutationState
                         return state;
                     }
-                    return new MutationState(0, connection);
+                    return new MutationState(0, 0, connection);
                 } catch (ConcurrentTableMutationException e) {
                     if (retried) {
                         throw e;
@@ -4026,12 +4026,12 @@ public class MetaDataClient {
                 TableRef dataTableRef = FromCompiler.getResolver(dataTableNode, connection).getTables().get(0);
                 return buildIndex(index, dataTableRef);
             }
-            return new MutationState(1, connection);
+            return new MutationState(1, 1000, connection);
         } catch (TableNotFoundException e) {
             if (!statement.ifExists()) {
                 throw e;
             }
-            return new MutationState(0, connection);
+            return new MutationState(0, 0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
@@ -4117,7 +4117,7 @@ public class MetaDataClient {
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
-        return new MutationState(0, connection);
+        return new MutationState(0, 0, connection);
     }
 
     private void validateSchema(String schemaName) throws SQLException {
@@ -4156,7 +4156,7 @@ public class MetaDataClient {
                 connection.removeSchema(schema, result.getMutationTime());
                 break;
             }
-            return new MutationState(0, connection);
+            return new MutationState(0, 0, connection);
         } finally {
             connection.setAutoCommit(wasAutoCommit);
         }
@@ -4171,6 +4171,6 @@ public class MetaDataClient {
             .resolveSchema(useSchemaStatement.getSchemaName());
             connection.setSchema(useSchemaStatement.getSchemaName());
         }
-        return new MutationState(0, connection);
+        return new MutationState(0, 0, connection);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
index bb293bb..b5a7e04 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PArrayDataTypeEncoder.java
@@ -20,11 +20,19 @@ package org.apache.phoenix.schema.types;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnValueEncoder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
@@ -167,4 +175,61 @@ public class PArrayDataTypeEncoder implements ColumnValueEncoder {
         return null;
     }
     
+    /**
+     * @param colValueMap map from column to value
+     * @return estimated encoded size
+     */
+    public static int getEstimatedByteSize(PTable table, int rowLength,
+            Map<PColumn, byte[]> colValueMap) {
+        // iterate over column familiies
+        int rowSize = 0;
+        for (PColumnFamily family : table.getColumnFamilies()) {
+            Collection<PColumn> columns = family.getColumns();
+            // we add a non null value to the start so that we can represent absent values in the array with negative offsets
+            int numColumns = columns.size() + 1;
+            int cellSize = 1;
+            int nulls = 0;
+            int maxOffset = 0;
+            // iterate over columns
+            for (PColumn column : columns) {
+                if (colValueMap.containsKey(column)) {
+                    byte[] colValue = colValueMap.get(column);
+                    // the column value is null
+                    if (colValue == null || colValue.length == 0) {
+                        ++nulls;
+                        maxOffset = cellSize;
+                    } else {
+                        // count the bytes written to serialize nulls
+                        if (nulls > 0) {
+                            cellSize += (1 + Math.ceil(nulls / 255));
+                            nulls = 0;
+                        }
+                        maxOffset = cellSize;
+                        cellSize += colValue.length;
+                    }
+                }
+                // the column value is absent
+                else {
+                    ++nulls;
+                    maxOffset = cellSize;
+                }
+            }
+            // count the bytes used for the offset array
+            cellSize +=
+                    PArrayDataType.useShortForOffsetArray(maxOffset,
+                        PArrayDataType.IMMUTABLE_SERIALIZATION_VERSION)
+                                ? numColumns * Bytes.SIZEOF_SHORT
+                                : numColumns * Bytes.SIZEOF_INT;
+            cellSize += 4;
+            // count the bytes used for header information
+            cellSize += 5;
+            // add the size of the single cell containing all column values
+            rowSize +=
+                    KeyValue.getKeyValueDataStructureSize(rowLength,
+                        family.getName().getBytes().length,
+                        QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES.length, cellSize);
+        }
+        return rowSize;
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
index d16521b..4234df5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/KeyValueUtil.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.Cell;
@@ -29,7 +30,14 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PArrayDataTypeEncoder;
 
 /**
  * 
@@ -177,4 +185,54 @@ public class KeyValueUtil {
         }
         return size;
     }
+
+    /**
+     * Estimates the storage size of a row
+     * @param mutations map from table to row to RowMutationState
+     * @return estimated row size
+     */
+    public static long
+            getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) {
+        long size = 0;
+        // iterate over tables
+        for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations
+                .entrySet()) {
+            PTable table = tableEntry.getKey().getTable();
+            // iterate over rows
+            for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue()
+                    .entrySet()) {
+                int rowLength = rowEntry.getKey().getLength();
+                Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
+                switch (table.getImmutableStorageScheme()) {
+                case ONE_CELL_PER_COLUMN:
+                    // iterate over columns
+                    for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
+                        PColumn pColumn = colValueEntry.getKey();
+                        size +=
+                                KeyValue.getKeyValueDataStructureSize(rowLength,
+                                    pColumn.getFamilyName().getBytes().length,
+                                    pColumn.getColumnQualifierBytes().length,
+                                    colValueEntry.getValue().length);
+                    }
+                    break;
+                case SINGLE_CELL_ARRAY_WITH_OFFSETS:
+                    // we store all the column values in a single key value that contains all the
+                    // column values followed by an offset array
+                    size +=
+                            PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
+                                colValueMap);
+                    break;
+                }
+                // count the empty key value
+                Pair<byte[], byte[]> emptyKeyValueInfo =
+                        EncodedColumnsUtil.getEmptyKeyValueInfo(table);
+                size +=
+                        KeyValue.getKeyValueDataStructureSize(rowLength,
+                            SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
+                            emptyKeyValueInfo.getFirst().length,
+                            emptyKeyValueInfo.getSecond().length);
+            }
+        }
+        return size;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/774a31e3/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
index d8f9df6..e7afb30 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixDriverTest.java
@@ -60,7 +60,7 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
     }
 
     @Test
-    public void testMaxMutationSizeSetCorrectly() throws Exception {
+    public void testMaxMutationSizeSetCorrectly() throws SQLException {
         Properties connectionProperties = new Properties();
         connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB,"100");
         connectionProperties.setProperty(QueryServices.IMMUTABLE_ROWS_ATTRIB,"100");
@@ -75,7 +75,9 @@ public class PhoenixDriverTest extends BaseConnectionlessQueryTest {
                 stmt.execute();
             }
             fail("Upsert should have failed since the number of upserts (200) is greater than the MAX_MUTATION_SIZE_ATTRIB (100)");
-        } catch (IllegalArgumentException expected) {}
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode());
+        }
     }
 
     @Test


Mime
View raw message