phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated
Date Wed, 13 Jan 2016 01:51:46 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master e4c479ab0 -> 30c2e7555


PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/30c2e755
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/30c2e755
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/30c2e755

Branch: refs/heads/master
Commit: 30c2e75556c365b059f23d43de5ac272f92aa285
Parents: e4c479a
Author: James Taylor <jtaylor@salesforce.com>
Authored: Tue Jan 12 17:50:34 2016 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Tue Jan 12 17:51:40 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/end2end/index/IndexIT.java   |  73 +++---
 .../org/apache/phoenix/tx/TransactionIT.java    |   2 -
 .../phoenix/compile/DelegateMutationPlan.java   |  70 ++++++
 .../phoenix/compile/PostIndexDDLCompiler.java   |  15 +-
 .../phoenix/exception/SQLExceptionCode.java     |   1 +
 .../apache/phoenix/execute/MutationState.java   | 230 +++++++++++++++----
 .../query/ConnectionQueryServicesImpl.java      |  12 +-
 .../apache/phoenix/schema/MetaDataClient.java   |  41 ++--
 .../org/apache/phoenix/schema/TableRef.java     |   6 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  21 +-
 pom.xml                                         |   2 +-
 11 files changed, 357 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index f48b847..e369dae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.BaseTest;
@@ -222,15 +223,19 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     
     @Test
     public void testCreateIndexAfterUpsertStarted() throws Exception {
-        if (transactional) { // FIXME: PHOENIX-2446
-            return;
+        testCreateIndexAfterUpsertStarted(false, fullTableName + "1", fullIndexName + "1");
+        if (transactional) {
+            testCreateIndexAfterUpsertStarted(true, fullTableName + "2", fullIndexName +
"2");
         }
+    }
+
+    private void testCreateIndexAfterUpsertStarted(boolean readOwnWrites, String fullTableName,
String fullIndexName) throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn1 = DriverManager.getConnection(getUrl(), props)) {
-            conn1.setAutoCommit(false);
+            conn1.setAutoCommit(true);
             String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
-            Statement stmt = conn1.createStatement();
-            stmt.execute(ddl);
+            Statement stmt1 = conn1.createStatement();
+            stmt1.execute(ddl);
             BaseTest.populateTestTable(fullTableName);
 
             ResultSet rs;
@@ -243,32 +248,40 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
                 
                 String upsert = "UPSERT INTO " + fullTableName
                         + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-                PreparedStatement pstmt = conn2.prepareStatement(upsert);
-                pstmt.setString(1, "varchar4");
-                pstmt.setString(2, "char4");
-                pstmt.setInt(3, 4);
-                pstmt.setLong(4, 4L);
-                pstmt.setBigDecimal(5, new BigDecimal(4.0));
+                PreparedStatement pstmt2 = conn2.prepareStatement(upsert);
+                pstmt2.setString(1, "varchar4");
+                pstmt2.setString(2, "char4");
+                pstmt2.setInt(3, 4);
+                pstmt2.setLong(4, 4L);
+                pstmt2.setBigDecimal(5, new BigDecimal(4.0));
                 Date date = DateUtil.parseDate("2015-01-01 00:00:00");
-                pstmt.setDate(6, date);
-                pstmt.setString(7, "varchar_a");
-                pstmt.setString(8, "chara");
-                pstmt.setInt(9, 2);
-                pstmt.setLong(10, 2L);
-                pstmt.setBigDecimal(11, new BigDecimal(2.0));
-                pstmt.setDate(12, date);
-                pstmt.setString(13, "varchar_b");
-                pstmt.setString(14, "charb");
-                pstmt.setInt(15, 3);
-                pstmt.setLong(16, 3L);
-                pstmt.setBigDecimal(17, new BigDecimal(3.0));
-                pstmt.setDate(18, date);
-                pstmt.executeUpdate();
+                pstmt2.setDate(6, date);
+                pstmt2.setString(7, "varchar_a");
+                pstmt2.setString(8, "chara");
+                pstmt2.setInt(9, 2);
+                pstmt2.setLong(10, 2L);
+                pstmt2.setBigDecimal(11, new BigDecimal(2.0));
+                pstmt2.setDate(12, date);
+                pstmt2.setString(13, "varchar_b");
+                pstmt2.setString(14, "charb");
+                pstmt2.setInt(15, 3);
+                pstmt2.setLong(16, 3L);
+                pstmt2.setBigDecimal(17, new BigDecimal(3.0));
+                pstmt2.setDate(18, date);
+                pstmt2.executeUpdate();
+                
+                if (readOwnWrites) {
+                    String query = "SELECT long_pk FROM " + fullTableName + " WHERE long_pk=4";
+                    rs = conn2.createStatement().executeQuery(query);
+                    assertTrue(rs.next());
+                    assertFalse(rs.next());
+                }
                 
+                String indexName = SchemaUtil.getTableNameFromFullName(fullIndexName);
                 ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + "
ON " + fullTableName
                         + " (long_pk, varchar_pk)"
                         + " INCLUDE (long_col1, long_col2)";
-                stmt.execute(ddl);
+                stmt1.execute(ddl);
                 
                 /*
                  * Commit upsert after index created through different connection.
@@ -276,16 +289,18 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
                  * at commit time, recognize the new index, and generate the correct metadata
(or index
                  * rows for immutable indexes).
                  * 
-                 * FIXME: PHOENIX-2446. For transactional data, this is problematic because
the index
+                 * For transactional data, this is problematic because the index
                  * gets a timestamp *after* the commit timestamp of conn2 and thus won't
be seen during
                  * the commit. Also, when the index is being built, the data hasn't yet been
committed
-                 * and thus won't be part of the initial index build.
+                 * and thus won't be part of the initial index build (fixed by PHOENIX-2446).
                  */
                 conn2.commit();
                 
-                rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+                stmt1 = conn1.createStatement();
+                rs = stmt1.executeQuery("SELECT COUNT(*) FROM " + fullTableName);
                 assertTrue(rs.next());
                 assertEquals(4,rs.getInt(1));
+                assertEquals(fullIndexName, stmt1.unwrap(PhoenixStatement.class).getQueryPlan().getTableRef().getTable().getName().getString());
                 
                 String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + fullTableName;
                 rs = conn1.createStatement().executeQuery(query);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index b1c3510..e08225c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -772,7 +772,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         }
     }
     
-    @Ignore("Add back once TEPHRA-162 gets fixed")
     @Test
     public void testInflightUpdateNotSeen() throws Exception {
         String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
@@ -824,7 +823,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         }
     }
     
-    @Ignore("Add back once TEPHRA-162 gets fixed")
     @Test
     public void testInflightDeleteNotSeen() throws Exception {
         String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
new file mode 100644
index 0000000..7ef5c48
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Set;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.TableRef;
+
+public class DelegateMutationPlan implements MutationPlan {
+    @Override
+    public MutationState execute() throws SQLException {
+        return plan.execute();
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return plan.getContext();
+    }
+
+    @Override
+    public TableRef getTargetRef() {
+        return plan.getTargetRef();
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return plan.getParameterMetaData();
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        return plan.getExplainPlan();
+    }
+
+    @Override
+    public Set<TableRef> getSourceRefs() {
+        return plan.getSourceRefs();
+    }
+
+    @Override
+    public Operation getOperation() {
+        return plan.getOperation();
+    }
+
+    private final MutationPlan plan;
+    
+    public DelegateMutationPlan(MutationPlan plan) {
+        this.plan = plan;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index f5bb4c4..bb0b595 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.compile;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.schema.PColumn;
@@ -100,9 +101,10 @@ public class PostIndexDDLCompiler {
             }
         }
 
+        final PTable dataTable = dataTableRef.getTable();
         dataColumns.setLength(dataColumns.length()-1);
         indexColumns.setLength(indexColumns.length()-1);
-        String schemaName = dataTableRef.getTable().getSchemaName().getString();
+        String schemaName = dataTable.getSchemaName().getString();
         String tableName = indexTable.getTableName().getString();
         
         StringBuilder updateStmtStr = new StringBuilder();
@@ -110,12 +112,19 @@ public class PostIndexDDLCompiler {
            .append(indexColumns).append(") ");
         final StringBuilder selectQueryBuilder = new StringBuilder();
         selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" FROM ")
-        .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+        .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTable.getTableName().getString()).append('"');
         this.selectQuery = selectQueryBuilder.toString();
         updateStmtStr.append(this.selectQuery);
         
         try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
-            return statement.compileMutation(updateStmtStr.toString());
+            DelegateMutationPlan delegate = new DelegateMutationPlan(statement.compileMutation(updateStmtStr.toString()))
{
+                @Override
+                public MutationState execute() throws SQLException {
+                    connection.getMutationState().commitWriteFence(dataTable);
+                    return super.execute();
+                }
+            };
+            return delegate;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 38e8ea0..9767cbe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -280,6 +280,7 @@ public enum SQLExceptionCode {
     TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if
transactions are disabled"),
     TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions
are disabled"),
     TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL(1084, "44A15", "Cannot set isolation level
to TRANSACTION_READ_COMMITTED or TRANSACTION_SERIALIZABLE if transactions are disabled"),
+    TX_UNABLE_TO_GET_WRITE_FENCE(1085, "44A16", "Unable to obtain write fence for DDL operation"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 6cac825..41d677a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
@@ -66,9 +68,11 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableRef;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -82,6 +86,7 @@ import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
@@ -97,10 +102,13 @@ import co.cask.tephra.Transaction;
 import co.cask.tephra.Transaction.VisibilityLevel;
 import co.cask.tephra.TransactionAware;
 import co.cask.tephra.TransactionCodec;
+import co.cask.tephra.TransactionConflictException;
 import co.cask.tephra.TransactionContext;
 import co.cask.tephra.TransactionFailureException;
 import co.cask.tephra.TransactionSystemClient;
 import co.cask.tephra.hbase11.TransactionAwareHTable;
+import co.cask.tephra.visibility.FenceWait;
+import co.cask.tephra.visibility.VisibilityFence;
 
 /**
  * 
@@ -126,6 +134,7 @@ public class MutationState implements SQLCloseable {
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
+    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations;
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -151,7 +160,7 @@ public class MutationState implements SQLCloseable {
     }
     
     private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext
txContext, long sizeOffset) {
-    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()),
tx, txContext);
+    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5),
tx, txContext);
         this.sizeOffset = sizeOffset;
     }
     
@@ -195,6 +204,53 @@ public class MutationState implements SQLCloseable {
         return maxSize;
     }
     
+    /**
+     * Commit a write fence when creating an index so that we can detect
+     * when a data table transaction is started before the create index
+     * but completes after it. In this case, we need to rerun the data
+     * table transaction after the index creation so that the index rows
+     * are generated. See {@link #addReadFence(PTable)} and TEPHRA-157
+     * for more information.
+     * @param dataTable the data table upon which an index is being added
+     * @throws SQLException
+     */
+    public void commitWriteFence(PTable dataTable) throws SQLException {
+        if (dataTable.isTransactional()) {
+            byte[] key = SchemaUtil.getTableKey(dataTable);
+            try {
+                FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient());
+                fenceWait.await(10000, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+            } catch (TimeoutException | TransactionFailureException e) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE)
+                .setSchemaName(dataTable.getSchemaName().getString())
+                .setTableName(dataTable.getTableName().getString())
+                .build().buildException();
+            } finally {
+                // The client expects a transaction to be in progress on the txContext while
the
+                // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it.
After it's
+                // finished, we start a new one here.
+                // TODO: seems like an autonomous tx capability in Tephra would be useful
here.
+                try {
+                    txContext.start();
+                } catch (TransactionFailureException e) {
+                    throw TransactionUtil.getTransactionFailureException(e);
+                }
+            }
+        }
+    }
+    
+    private void addReadFence(PTable dataTable) throws SQLException {
+        byte[] logicalKey = SchemaUtil.getTableKey(dataTable);
+        this.txContext.addTransactionAware(VisibilityFence.create(logicalKey));
+        byte[] physicalKey = dataTable.getPhysicalName().getBytes();
+        if (Bytes.compareTo(physicalKey, logicalKey) != 0) {
+            this.txContext.addTransactionAware(VisibilityFence.create(physicalKey));
+        }
+    }
+    
     public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException {
         Transaction currentTx = getTransaction();
         if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable()
== null || !plan.getTargetRef().getTable().isTransactional()) {
@@ -332,6 +388,12 @@ public class MutationState implements SQLCloseable {
         
         try {
             if (!isTransactionStarted()) {
+                // Clear any transactional state in case transaction was ended outside
+                // of Phoenix so we don't carry the old transaction state forward. We
+                // cannot call reset() here due to the case of having mutations and
+                // then transitioning from non transactional to transactional (which
+                // would end up clearing our uncommitted state).
+                resetTransactionalState();
                 txContext.start();
                 return true;
             }
@@ -410,6 +472,10 @@ public class MutationState implements SQLCloseable {
                 // Put the existing one back now that it's merged
                 this.mutations.put(entry.getKey(), existingRows);
             } else {
+                // Size new map at batch size as that's what it'll likely grow to.
+                Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+                newRows.putAll(entry.getValue());
+                this.mutations.put(tableRef, newRows);
                 if (!isIndex) {
                     numRows += entry.getValue().size();
                 }
@@ -705,7 +771,6 @@ public class MutationState implements SQLCloseable {
         @Override
         public void delete(List<Delete> deletes) throws IOException {
             ServerCache cache = null;
-            SQLException sqlE = null;
             try {
                 PTable table = tableRef.getTable();
                 List<PTable> indexes = table.getIndexes();
@@ -764,6 +829,7 @@ public class MutationState implements SQLCloseable {
             sendAll = true;
         }
 
+        Map<ImmutableBytesPtr, RowMutationState> valuesMap;
         List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size());
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to
tables")) {
@@ -773,7 +839,7 @@ public class MutationState implements SQLCloseable {
 	        while (tableRefIterator.hasNext()) {
 	        	// at this point we are going through mutations for each table
 	            final TableRef tableRef = tableRefIterator.next();
-	            Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef);
+	            valuesMap = mutations.get(tableRef);
 	            if (valuesMap == null || valuesMap.isEmpty()) {
 	                continue;
 	            }
@@ -782,6 +848,7 @@ public class MutationState implements SQLCloseable {
 	            final PTable table = tableRef.getTable();
 	            // Track tables to which we've sent uncommitted data
 	            if (isTransactional = table.isTransactional()) {
+	                addReadFence(table);
                     txTableRefs.add(tableRef);
 	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
 	            }
@@ -832,7 +899,7 @@ public class MutationState implements SQLCloseable {
                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                             
                             long startTime = System.currentTimeMillis();
-                            child.addTimelineAnnotation("Attempt " + retryCount);;
+                            child.addTimelineAnnotation("Attempt " + retryCount);
 	                        hTable.batch(mutationList);
 	                        child.stop();
 	                        child.stop();
@@ -899,6 +966,13 @@ public class MutationState implements SQLCloseable {
 	            // committed in the event of a failure.
 	            if (isTransactional) {
 	                addUncommittedStatementIndexes(valuesMap.values());
+	                if (txMutations == null) {
+	                    txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
+	                }
+	                // Keep all mutations we've encountered until a commit or rollback.
+	                // This is not ideal, but there's not good way to get the values back
+	                // in the event that we need to replay the commit.
+	                txMutations.put(tableRef, valuesMap);
 	            }
                 // Remove batches as we process them
 	            if (sendAll) {
@@ -985,12 +1059,17 @@ public class MutationState implements SQLCloseable {
     public void close() throws SQLException {
     }
 
-    private void reset() {
+    private void resetState() {
+        numRows = 0;
+        this.mutations.clear();
+        resetTransactionalState();
+    }
+    
+    private void resetTransactionalState() {
         tx = null;
         txAwares.clear();
+        txMutations = null;
         uncommittedPhysicalNames.clear();
-        this.mutations.clear();
-        numRows = 0;
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
     
@@ -1004,62 +1083,121 @@ public class MutationState implements SQLCloseable {
                 }
             }
         } finally {
-            reset();
+            resetState();
         }
     }
     
     public void commit() throws SQLException {
-    	boolean sendSuccessful=false;
-    	SQLException sqlE = null;
-        try {
-            send();
-            sendSuccessful=true;
-        } catch (SQLException e) {
-            sqlE = e;
-        } finally {
+        Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations =
Collections.emptyMap();
+        int retryCount = 0;
+        do {
+        	boolean sendSuccessful=false;
+        	boolean retryCommit = false;
+        	SQLException sqlE = null;
             try {
-                if (txContext != null && isTransactionStarted()) {
-                    TransactionFailureException txFailure = null;
-                    boolean finishSuccessful=false;
-                    try {
-                        if (sendSuccessful) {
-                            txContext.finish();
-                            finishSuccessful = true;
-                        }
-                    } catch (TransactionFailureException e) {
-                        txFailure = e;
-                        SQLException nextE = TransactionUtil.getTransactionFailureException(e);
-                        if (sqlE == null) {
-                            sqlE = nextE;
-                        } else {
-                            sqlE.setNextException(nextE);
+                send();
+                txMutations = this.txMutations;
+                sendSuccessful=true;
+            } catch (SQLException e) {
+                sqlE = e;
+            } finally {
+                try {
+                    if (txContext != null && isTransactionStarted()) {
+                        TransactionFailureException txFailure = null;
+                        boolean finishSuccessful=false;
+                        try {
+                            if (sendSuccessful) {
+                                txContext.finish();
+                                finishSuccessful = true;
+                            }
+                        } catch (TransactionFailureException e) {
+                            retryCommit = (e instanceof TransactionConflictException &&
retryCount == 0);
+                            txFailure = e;
+                            SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+                            if (sqlE == null) {
+                                sqlE = nextE;
+                            } else {
+                                sqlE.setNextException(nextE);
+                            }
+                        } finally {
+                            // If send fails or finish fails, abort the tx
+                            if (!finishSuccessful) {
+                                try {
+                                    txContext.abort(txFailure);
+                                } catch (TransactionFailureException e) {
+                                    SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+                                    if (sqlE == null) {
+                                        sqlE = nextE;
+                                    } else {
+                                        sqlE.setNextException(nextE);
+                                    }
+                                }
+                            }
                         }
+                    }
+                } finally {
+                    try {
+                        resetState();
                     } finally {
-                        // If send fails or finish fails, abort the tx
-                        if (!finishSuccessful) {
+                        if (retryCommit) {
+                            startTransaction();
                             try {
-                                txContext.abort(txFailure);
-                            } catch (TransactionFailureException e) {
-                                SQLException nextE = TransactionUtil.getTransactionFailureException(e);
+                                retryCommit = wasIndexAdded(txMutations.keySet());
+                            } catch (SQLException e) {
+                                retryCommit = false;
                                 if (sqlE == null) {
-                                    sqlE = nextE;
+                                    sqlE = e;
                                 } else {
-                                    sqlE.setNextException(nextE);
+                                    sqlE.setNextException(e);
                                 }
                             }
                         }
+                        if (sqlE != null && !retryCommit) {
+                            throw sqlE;
+                        }
                     }
                 }
-            } finally {
-                try {
-                    reset();
-                } finally {
-                    if (sqlE != null) {
-                        throw sqlE;
-                    }
-                }
+            }
+            // Retry commit once if conflict occurred and index was added
+            if (!retryCommit) {
+                break;
+            }
+            retryCount++;
+            mutations.putAll(txMutations);
+        } while (true);
+    }
+
+    /**
+     * Determines whether indexes were added to mutated tables while the transaction was
in progress.
+     * @return true if indexes were added and false otherwise.
+     * @throws SQLException 
+     */
+    private boolean wasIndexAdded(Set<TableRef> txTableRefs) throws SQLException {
+        MetaDataClient client = new MetaDataClient(connection);
+        PMetaData cache = connection.getMetaDataCache();
+        boolean addedIndexes = false;
+        for (TableRef tableRef : txTableRefs) {
+            PTable dataTable = tableRef.getTable();
+            List<PTable> oldIndexes;
+            PTableRef ptableRef = cache.getTableRef(dataTable.getKey());
+            oldIndexes = ptableRef.getTable().getIndexes();
+            MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName().getString(),
dataTable.getTableName().getString());
+            long timestamp = TransactionUtil.getResolvedTime(connection, result);
+            tableRef.setTimeStamp(timestamp);
+            if (result.getTable() == null) {
+                throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString());
+            }
+            if (!result.wasUpdated()) {
+                continue;
+            }
+            if (!addedIndexes) {
+                // TODO: in theory we should do a deep equals check here, as it's possible
+                // that an index was dropped and recreated with the same name but different
+                // indexed/covered columns.
+                addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes()));
             }
         }
+        return addedIndexes;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index fb29e2c..530299b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -58,12 +58,6 @@ import java.util.concurrent.TimeoutException;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -205,6 +199,12 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
 
 public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices
{
     private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 2a514d5..7ee30fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -109,8 +109,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import co.cask.tephra.TxConstants;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -213,6 +211,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 
+import co.cask.tephra.TxConstants;
+
 public class MetaDataClient {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -1053,7 +1053,7 @@ public class MetaDataClient {
                         throw new SQLException(e);
                     }
                     ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    PTable dataTable = tableRef.getTable();
+                    final PTable dataTable = tableRef.getTable();
                     for(PTable idx: dataTable.getIndexes()) {
                         if(idx.getName().equals(index.getName())) {
                             index = idx;
@@ -1078,6 +1078,7 @@ public class MetaDataClient {
 
                         @Override
                         public MutationState execute() throws SQLException {
+                            connection.getMutationState().commitWriteFence(dataTable);
                             Cell kv = plan.iterator().next().getValue(0);
                             ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
                             // A single Cell will be returned with the count(*) - we decode
that here
@@ -1581,24 +1582,22 @@ public class MetaDataClient {
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, transactional);
                 storeNulls = parent.getStoreNulls();
-                if (tableType == PTableType.INDEX) {
-	                // Index on view
-	                // TODO: Can we support a multi-tenant index directly on a multi-tenant
-	                // table instead of only a view? We don't have anywhere to put the link
-	                // from the table to the index, though.
-	                if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW
&& parent.getViewType() != ViewType.MAPPED)) {
-	                	PName physicalName = parent.getPhysicalName();
-	                    saltBucketNum = parent.getBucketNum();
-	                    addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
-	                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
-	                    if (indexType == IndexType.LOCAL) {
-	                        saltBucketNum = null;
-	                        // Set physical name of local index table
-	                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
-	                    } else {
-	                        // Set physical name of view index table
-	                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
-	                    }
+                // Index on view
+                // TODO: Can we support a multi-tenant index directly on a multi-tenant
+                // table instead of only a view? We don't have anywhere to put the link
+                // from the table to the index, though.
+                if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW
&& parent.getViewType() != ViewType.MAPPED)) {
+                	PName physicalName = parent.getPhysicalName();
+                    saltBucketNum = parent.getBucketNum();
+                    addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL);
+                    defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString();
+                    if (indexType == IndexType.LOCAL) {
+                        saltBucketNum = null;
+                        // Set physical name of local index table
+                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes())));
+                    } else {
+                        // Set physical name of view index table
+                        physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes())));
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
index 26cbbc3..35e2f77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java
@@ -31,8 +31,8 @@ public class TableRef {
     public static final TableRef EMPTY_TABLE_REF = new TableRef(new PTableImpl());
     
     private PTable table;
+    private long upperBoundTimeStamp;
     private final String alias;
-    private final long upperBoundTimeStamp;
     private final long lowerBoundTimeStamp;
     private final boolean hasDynamicCols;
 
@@ -73,6 +73,10 @@ public class TableRef {
         this.table = value;
     }
 
+    public void setTimeStamp(long timeStamp) {
+        this.upperBoundTimeStamp = timeStamp;
+    }
+
     public String getTableAlias() {
         return alias;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c6a20b3..de92046 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -17,6 +17,13 @@
  */
 package org.apache.phoenix.util;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES;
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
@@ -31,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.KeyValue;
@@ -66,13 +74,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES;
-
 /**
  * 
  * Static class for various schema-related utilities
@@ -890,4 +891,10 @@ public class SchemaUtil {
     public static boolean hasRowTimestampColumn(PTable table) {
     	return table.getRowTimestampColPos()>0;
     }
+
+    public static byte[] getTableKey(PTable dataTable) {
+        PName tenantId = dataTable.getTenantId();
+        PName schemaName = dataTable.getSchemaName();
+        return getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(),
schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName.getBytes(), dataTable.getTableName().getBytes());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8860451..72551a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,7 @@
     <joni.version>2.1.2</joni.version>
     <calcite.version>1.5.0</calcite.version>
     <jettyVersion>8.1.7.v20120910</jettyVersion>
-    <tephra.version>0.6.3</tephra.version>
+    <tephra.version>0.6.4</tephra.version>
 
     <!-- Test Dependencies -->
     <mockito-all.version>1.8.5</mockito-all.version>


Mime
View raw message