phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-3128 Remove extraneous operations during upsert with local immutable index
Date Wed, 03 Aug 2016 00:07:25 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 446c58b1d -> 021b525b7


PHOENIX-3128 Remove extraneous operations during upsert with local immutable index


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d1d4a801
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d1d4a801
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d1d4a801

Branch: refs/heads/4.x-HBase-0.98
Commit: d1d4a801265788919a6958e3585a137ec6f71eb2
Parents: 446c58b
Author: James Taylor <jamestaylor@apache.org>
Authored: Tue Aug 2 12:03:33 2016 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Tue Aug 2 17:03:53 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/DistinctPrefixFilterIT.java | 11 ++-
 .../apache/phoenix/end2end/index/IndexIT.java   | 39 +++++++++
 .../compile/PostLocalIndexDDLCompiler.java      | 15 ++--
 .../apache/phoenix/execute/MutationState.java   | 11 ++-
 .../hbase/index/covered/IndexMetaData.java      |  9 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |  6 +-
 .../phoenix/index/PhoenixIndexMetaData.java     | 12 ++-
 .../index/PhoenixTransactionalIndexer.java      | 89 +++++++++++++-------
 .../org/apache/phoenix/schema/PTableImpl.java   | 13 ++-
 .../apache/phoenix/util/TransactionUtil.java    |  7 ++
 10 files changed, 164 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
index 203d51e..9d31070 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DistinctPrefixFilterIT.java
@@ -88,6 +88,15 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT
{
         insertPrefixV("3", "1");
         insertPrefixV("3", "2");
         insertPrefixV("3", "3");
+        conn.commit();
+        ResultSet rs;
+        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ count(*) from "
+ testTableV);
+        assertTrue(rs.next());
+        long count1 = rs.getLong(1);
+        rs = conn.createStatement().executeQuery("select count(*) from " + testTableV + "_idx");
+        assertTrue(rs.next());
+        long count2 = rs.getLong(1);
+        assertEquals(count1,count2);
 
         multiply();
         multiply();
@@ -258,7 +267,7 @@ public class DistinctPrefixFilterIT extends BaseHBaseManagedTimeTableReuseIT
{
 
         testCommonDistinct(testTableF);
         testCommonDistinct(testTableV);
-}
+    }
 
     private void testCommonDistinct(String testTable) throws Exception {
         testSkipRange("SELECT %s DISTINCT prefix1 FROM " + testTable, 4);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index 35a0aad..df45ecb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.Date;
@@ -39,7 +40,15 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.coprocessor.generated.PTableProtos.PTableType;
@@ -56,6 +65,7 @@ import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -63,6 +73,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -871,6 +882,7 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT {
             conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR
NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)"+tableDDLOptions);
             query = "SELECT * FROM "+fullTableName;
             rs = conn.createStatement().executeQuery(query);
+            long ts = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,fullTableName)).getTimeStamp();
             assertFalse(rs.next());
             conn.createStatement().execute(
   	  	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON "
+ fullTableName + "(\"v2\") INCLUDE (\"V1\")");
@@ -941,9 +953,36 @@ public class IndexIT extends BaseHBaseManagedTimeTableReuseIT {
             assertEquals("2",rs.getString(5));
             assertEquals("2",rs.getString("v2"));
             assertFalse(rs.next());
+            
+            assertNoIndexDeletes(conn, ts, fullIndexName);
         } 
     }
 
+    private void assertNoIndexDeletes(Connection conn, long minTimestamp, String fullIndexName)
throws IOException, SQLException {
+        if (!this.mutable) {
+            PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            PTable index = pconn.getTable(new PTableKey(null, fullIndexName));
+            byte[] physicalIndexTable = index.getPhysicalName().getBytes();
+            try (HTableInterface hIndex = pconn.getQueryServices().getTable(physicalIndexTable))
{
+                Scan scan = new Scan();
+                scan.setRaw(true);
+                if (this.transactional) {
+                    minTimestamp = TransactionUtil.convertToNanoseconds(minTimestamp);
+                }
+                scan.setTimeRange(minTimestamp, HConstants.LATEST_TIMESTAMP);
+                ResultScanner scanner = hIndex.getScanner(scan);
+                Result result;
+                while ((result = scanner.next()) != null) {
+                    CellScanner cellScanner = result.cellScanner();
+                    while (cellScanner.advance()) {
+                        Cell current = cellScanner.current();
+                        assertEquals (KeyValue.Type.Put.getCode(), current.getTypeByte());
+                    }
+                }
+            };
+        }
+    }
+
     @Test
     public void testInFilterOnIndexedTable() throws Exception {
         String tableName = "TBL_" + generateRandomString();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
index 079ff5c..81dbe0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.compile;
 
-import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
@@ -34,9 +33,9 @@ import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.Lists;
 
@@ -93,10 +92,14 @@ public class PostLocalIndexDDLCompiler {
                 @Override
                 public MutationState execute() throws SQLException {
                     connection.getMutationState().commitDDLFence(dataTable);
-                    Cell kv = plan.iterator().next().getValue(0);
-                    ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
-                    // A single Cell will be returned with the count(*) - we decode that
here
-                    long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+                    Tuple tuple = plan.iterator().next();
+                    long rowCount = 0;
+                    if (tuple != null) {
+                        Cell kv = tuple.getValue(0);
+                        ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(),
kv.getValueOffset(), kv.getValueLength());
+                        // A single Cell will be returned with the count(*) - we decode that
here
+                        rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault());
+                    }
                     // The contract is to return a MutationState that contains the number
of rows modified. In this
                     // case, it's the number of rows in the data table which corresponds
to the number of index
                     // rows that were added.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ca026f5..ae78e97 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -552,12 +552,15 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long
timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
+            final long timestamp, boolean includeAllIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows
through this client-side mechanism
-                (table.isImmutableRows() || includeMutableIndexes) ? 
-                        IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator())
: 
-                        Iterators.<PTable>emptyIterator();
+                 includeAllIndexes || table.isWALDisabled() ? // TODO: remove check for isWALDisabled
once PHOENIX-3137 is fixed.
+                     IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator())
:
+                         table.isImmutableRows() ?
+                            IndexMaintainer.enabledGlobalIndexIterator(table.getIndexes().iterator())
:
+                                Iterators.<PTable>emptyIterator();
         final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size())
: null;
         generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index ee25a40..5420013 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -18,5 +18,12 @@
 package org.apache.phoenix.hbase.index.covered;
 
 public interface IndexMetaData {
-    public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {};
+    public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {
+
+        @Override
+        public boolean isImmutableRows() {
+            return false;
+        }};
+    
+    public boolean isImmutableRows();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 11e7d1a..f42ea5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -98,7 +98,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         Collection<Batch> batches = createTimestampBatchesFromMutation(m);
 
         // go through each batch of keyvalues and build separate index entries for each
-        boolean cleanupCurrentState = true;
+        boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
         for (Batch batch : batches) {
             /*
              * We have to split the work between the cleanup and the update for each group
because when we update the
@@ -215,7 +215,9 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         // determine if we need to make any cleanup given the pending update.
         long batchTs = batch.getTimestamp();
         state.setPendingUpdates(batch.getKvs());
-        addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
+        if (!indexMetaData.isImmutableRows()) {
+            addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
+        }
 
         // A.2 do a single pass first for the updates to the current state
         state.applyPendingUpdates();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 7a67b9c..2679f1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -36,13 +36,13 @@ import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
-
 import org.apache.tephra.Transaction;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
     private final IndexMetaDataCache indexMetaDataCache;
     private final boolean ignoreNewerMutations;
+    private final boolean isImmutable;
     
     private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env,
Map<String, byte[]> attributes) throws IOException {
         if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
}
@@ -87,6 +87,11 @@ public class PhoenixIndexMetaData implements IndexMetaData {
 
     public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]>
attributes) throws IOException {
         this.indexMetaDataCache = getIndexMetaData(env, attributes);
+        boolean isImmutable = true;
+        for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
+            isImmutable &= maintainer.isImmutableRows();
+        }
+        this.isImmutable = isImmutable;
         this.attributes = attributes;
         this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS)
!= null;
     }
@@ -106,4 +111,9 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     public boolean ignoreNewerMutations() {
         return ignoreNewerMutations;
     }
+
+    @Override
+    public boolean isImmutableRows() {
+        return isImmutable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index f8be3ee..c67da6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -64,7 +64,6 @@ import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
@@ -72,10 +71,10 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TxConstants;
@@ -236,49 +235,68 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver
{
         }
     }
 
+    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations,
ImmutableBytesPtr row, Mutation m) {
+        MultiMutation stored = mutations.get(row);
+        // we haven't seen this row before, so add it
+        if (stored == null) {
+            stored = new MultiMutation(row);
+            mutations.put(row, stored);
+        }
+        stored.addAll(m);
+    }
+    
     private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment
env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[]
txRollbackAttribute) throws IOException {
+        Transaction tx = indexMetaData.getTransaction();
+        if (tx == null) {
+            throw new NullPointerException("Expected to find transaction in metadata for
" + env.getRegion().getRegionInfo().getTable().getNameAsString());
+        }
+        boolean isRollback = txRollbackAttribute!=null;
+        boolean isImmutable = indexMetaData.isImmutableRows();
         ResultScanner currentScanner = null;
         TransactionAwareHTable txTable = null;
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
+        Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
+        if (isImmutable && !isRollback) {
+            findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
+        } else {
+            findPriorValueMutations = mutations;
+        }
         while(mutationIterator.hasNext()) {
             Mutation m = mutationIterator.next();
             // add the mutation to the batch set
             ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-            MultiMutation stored = mutations.get(row);
-            // we haven't seen this row before, so add it
-            if (stored == null) {
-                stored = new MultiMutation(row);
-                mutations.put(row, stored);
+            if (mutations != findPriorValueMutations && isDeleteMutation(m)) {
+                addMutation(findPriorValueMutations, row, m);
             }
-            stored.addAll(m);
+            addMutation(mutations, row, m);
         }
         
         // Collect the set of mutable ColumnReferences so that we can first
         // run a scan to get the current state. We'll need this to delete
         // the existing index rows.
-        Transaction tx = indexMetaData.getTransaction();
-        if (tx == null) {
-            throw new NullPointerException("Expected to find transaction in metadata for
" +
-                    env.getRegion().getRegionInfo().getTable().getNameAsString());
-        }
         List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
-        Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size()
* 10);
+        int estimatedSize = indexMaintainers.size() * 10;
+        Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
         for (IndexMaintainer indexMaintainer : indexMaintainers) {
             // For transactional tables, we use an index maintainer
             // to aid in rollback if there's a KeyValue column in the index. The alternative
would be
             // to hold on to all uncommitted index row keys (even ones already sent to HBase)
on the
             // client side.
-            mutableColumns.addAll(indexMaintainer.getAllColumns());
+            Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+            mutableColumns.addAll(allColumns);
         }
 
-        boolean isRollback = txRollbackAttribute!=null;
         Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation,
byte[]>>(mutations.size() * 2 * indexMaintainers.size());
         try {
-            if (!mutableColumns.isEmpty()) {
+            // Track if we have row keys with Delete mutations (or Puts that are
+            // Tephra's Delete marker). If there are none, we don't need to do the scan for
+            // prior versions, if there are, we do. Since rollbacks always have delete mutations,
+            // this logic will work there too.
+            if (!findPriorValueMutations.isEmpty()) {
                 List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
-                for (ImmutableBytesPtr ptr : mutations.keySet()) {
+                for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
                     keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
                 }
                 Scan scan = new Scan();
@@ -306,9 +324,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                 currentScanner = txTable.getScanner(scan);
             }
             if (isRollback) {
-                processRollback(env, indexMetaData, txRollbackAttribute, currentScanner,
mutations, tx, mutableColumns, indexUpdates);
+                processRollback(env, indexMetaData, txRollbackAttribute, currentScanner,
tx, mutableColumns, indexUpdates, mutations);
             } else {
-                processMutation(env, indexMetaData, txRollbackAttribute, currentScanner,
mutations, tx, mutableColumns, indexUpdates);
+                processMutation(env, indexMetaData, txRollbackAttribute, currentScanner,
tx, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
             }
         } finally {
             if (txTable != null) txTable.close();
@@ -317,26 +335,39 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver
{
         return indexUpdates;
     }
 
+    private static boolean isDeleteMutation(Mutation m) {
+        for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet())
{
+            for (Cell cell : cellMap.getValue()) {
+                if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell))
{
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
-            Set<ColumnReference> mutableColumns,
-            Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
{
+            Transaction tx, 
+            Set<ColumnReference> upsertColumns, 
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations,
+            Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws
IOException {
         if (scanner != null) {
             Result result;
             ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(),
QueryConstants.EMPTY_COLUMN_BYTES);
             // Process existing data table rows by removing the old index row and adding
the new index row
             while ((result = scanner.next()) != null) {
-                Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
-                TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(),
tx.getWritePointer(), m, emptyColRef, result);
+                Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
+                TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(),
tx.getWritePointer(), m, emptyColRef, result);
                 generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
                 generatePuts(indexMetaData, indexUpdates, state);
             }
         }
         // Process new data table by adding new index rows
         for (Mutation m : mutations.values()) {
-            TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(),
tx.getWritePointer(), m);
+            TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(),
tx.getWritePointer(), m);
             generatePuts(indexMetaData, indexUpdates, state);
         }
     }
@@ -344,9 +375,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
     private void processRollback(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
-            Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx,
-            Set<ColumnReference> mutableColumns,
-            Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
{
+            Transaction tx, Set<ColumnReference> mutableColumns,
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
         if (scanner != null) {
             Result result;
             // Loop through last committed row state plus all new rows associated with current
transaction

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index ec09992..847979a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -834,12 +834,17 @@ public class PTableImpl implements PTable {
             // we're using the Tephra column family delete marker here to prevent the translation

             // of deletes to puts by the Tephra's TransactionProcessor
             if (PTableImpl.this.isTransactional()) {
-                Put delete = new Put(key);
-                for (PColumnFamily colFamily : families) {
-                    delete.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER,
ts,
+                Put put = new Put(key);
+                if (families.isEmpty()) {
+                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER,
ts,
                             HConstants.EMPTY_BYTE_ARRAY);
+                } else {
+                    for (PColumnFamily colFamily : families) {
+                        put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER,
ts,
+                                HConstants.EMPTY_BYTE_ARRAY);
+                    }
                 }
-                deleteRow = delete;                
+                deleteRow = put;                
             } else {
                 Delete delete = new Delete(key);
                 for (PColumnFamily colFamily : families) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d1d4a801/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
index 1dcf9d3..0e044b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.util;
 
 import java.sql.SQLException;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -35,6 +38,10 @@ public class TransactionUtil {
     private TransactionUtil() {
     }
     
+    public static boolean isDelete(Cell cell) {
+        return (CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY));
+    }
+    
     public static long convertToNanoseconds(long serverTimeStamp) {
         return serverTimeStamp * TxConstants.MAX_TX_PER_MS;
     }


Mime
View raw message