phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject [07/10] phoenix git commit: PHOENIX-2395 Refactor mutable secondary indexes code
Date Wed, 11 Nov 2015 23:05:17 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
index 5964647..3335aaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -22,16 +22,14 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 
 import com.google.common.collect.Multimap;
 
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-
 /**
  * Handle failures to write to the index tables.
  */
 public interface IndexFailurePolicy extends Stoppable {
-
   public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
deleted file mode 100644
index 1c45cd3..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.index;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-
-/**
- *
- */
-public abstract class BaseIndexCodec implements IndexCodec {
-
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException {
-    // noop
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * By default, the codec is always enabled. Subclasses should override this method if they want do
-   * decide to index on a per-mutation basis.
- * @throws IOException 
-   */
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * Assumes each mutation is not in a batch. Subclasses that have different batching behavior
-   * should override this.
-   */
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    return null;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index b060345..dea82a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -36,6 +36,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
@@ -199,6 +200,40 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         ptr.set(stream.getBuffer(), 0, stream.size());
     }
     
+
+    public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr,
+            List<PTable> keyValueIndexes, PhoenixConnection connection) {
+        int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr);
+        int nIndexes = nMutableIndexes + keyValueIndexes.size();
+        int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer
+        if (indexMetaDataPtr.getLength() == 0) {
+            estimatedSize += table.getRowKeySchema().getEstimatedByteSize();
+        }
+        for (PTable index : keyValueIndexes) {
+            estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize();
+        }
+        TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1);
+        DataOutput output = new DataOutputStream(stream);
+        try {
+            // Encode data table salting in sign of number of indexes
+            WritableUtils.writeVInt(output, nIndexes * (table.getBucketNum() == null ? 1 : -1));
+            // Serialize current mutable indexes, subtracting the vint size from the length
+            // as its still included
+            if (indexMetaDataPtr.getLength() > 0) {
+                output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes));
+            } else {
+                table.getRowKeySchema().write(output);
+            }
+            // Serialize mutable indexes afterwards
+            for (PTable index : keyValueIndexes) {
+                index.getIndexMaintainer(table, connection).write(output);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size());
+    }
+    
     public static List<IndexMaintainer> deserialize(ImmutableBytesWritable metaDataPtr,
             KeyValueBuilder builder) {
         return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength());
@@ -270,6 +305,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
 
     private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
+        assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
         this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable();
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
@@ -806,13 +842,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
                 this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts,
-                ByteUtil.EMPTY_BYTE_ARRAY_PTR));
+                // set the value to the empty column name
+                QueryConstants.EMPTY_COLUMN_BYTES_PTR));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
         int i = 0;
         for (ColumnReference ref : this.getCoverededColumns()) {
             ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesPtr value = valueGetter.getLatestValue(ref);
+            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
             byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
             ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
             if (value != null) {
@@ -827,29 +864,51 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return put;
     }
 
-    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+    private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
+    private DeleteType getDeleteTypeOrNull(Collection<KeyValue> pendingUpdates) {
         int nDeleteCF = 0;
+        int nDeleteVersionCF = 0;
         for (KeyValue kv : pendingUpdates) {
-            if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
-                nDeleteCF++;
+        	if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+                nDeleteVersionCF++;
             }
+        	else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()
+        			// Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor
+        			) {
+        	    nDeleteCF++;
+        	}
         }
-        return nDeleteCF == this.nDataCFs && nDeleteCF > 0;
+        // This is what a delete looks like on the server side for mutable indexing...
+        // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
+        DeleteType deleteType = null; 
+        if (nDeleteVersionCF > 0 && nDeleteVersionCF >= this.nDataCFs) {
+        	deleteType = DeleteType.SINGLE_VERSION;
+        } else {
+			int nDelete = nDeleteCF + nDeleteVersionCF;
+			if (nDelete>0 && nDelete >= this.nDataCFs) {
+				deleteType = DeleteType.ALL_VERSIONS;
+			}
+		}
+        return deleteType;
+    }
+    
+    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+        return getDeleteTypeOrNull(pendingUpdates) != null;
     }
     
     private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException {
         if (pendingUpdates.isEmpty()) {
             return false;
         }
-        Map<ColumnReference,KeyValue> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
-        for (KeyValue kv : pendingUpdates) {
+        Map<ColumnReference,Cell> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
+        for (Cell kv : pendingUpdates) {
             newState.put(new ColumnReference(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)), kv);
         }
         for (ColumnReference ref : indexedColumns) {
-        	KeyValue newValue = newState.get(ref);
+        	Cell newValue = newState.get(ref);
         	if (newValue != null) { // Indexed column has potentially changed
-        		ImmutableBytesPtr oldValue = oldState.getLatestValue(ref);
-        		boolean newValueSetAsNull = newValue.getTypeByte() == Type.DeleteColumn.getCode();
+        	    ImmutableBytesWritable oldValue = oldState.getLatestValue(ref);
+        		boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY));
         		//If the new column value has to be set as null and the older value is null too,
         		//then just skip to the next indexed column.
         		if (newValueSetAsNull && oldValue == null) {
@@ -880,14 +939,28 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
-        if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
-            Delete delete = new Delete(indexRowKey, ts);
+        DeleteType deleteType = null;
+        if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
+            byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary();
+            Delete delete = new Delete(indexRowKey);
+            // If table delete was single version, then index delete should be as well
+            if (deleteType == DeleteType.SINGLE_VERSION) {
+                for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+                    delete.deleteFamilyVersion(ref.getFamily(), ts);
+                }
+                delete.deleteFamilyVersion(emptyCF, ts);
+            } else {
+                for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set<byte[]> for index CFs?
+                    delete.deleteFamily(ref.getFamily(), ts);
+                }
+                delete.deleteFamily(emptyCF, ts);
+            }
             delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
             return delete;
         }
         Delete delete = null;
         // Delete columns for missing key values
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier());
                 if (coveredColumns.contains(ref)) {
@@ -895,7 +968,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                         delete = new Delete(indexRowKey);                    
                         delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
                     }
-                    delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    // If point delete for data table, then use point delete for index as well
+                    if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) {
+                        delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    } else {
+                        delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts);
+                    }
                 }
             }
         }
@@ -1354,14 +1432,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 .size());
         for (Cell kv : pendingUpdates) {
             // create new pointers to each part of the kv
-            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
-            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
             ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-            valueMap.put(new ColumnReference(kv.getRowArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
+            valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
         }
         return new ValueGetter() {
             @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
                 if(ref.equals(dataEmptyKeyValueRef)) return null;
                 return valueMap.get(ref);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 3ef01fe..0601e0a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -23,16 +23,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 
@@ -41,13 +45,33 @@ import com.google.common.collect.Lists;
 /**
  * Index builder for covered-columns index that ties into phoenix for faster use.
  */
-public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
+public class PhoenixIndexBuilder extends NonTxIndexBuilder {
 
     @Override
-    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+    public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap());
+    }
+
+    protected PhoenixIndexCodec getCodec() {
+        return (PhoenixIndexCodec)codec;
+    }
+
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        Configuration conf = env.getConfiguration();
+        // Install handler that will attempt to disable the index first before killing the region
+        // server
+        conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
+            PhoenixIndexFailurePolicy.class.getName());
+    }
+
+    @Override
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, IndexMetaData context) throws IOException {
         // The entire purpose of this method impl is to get the existing rows for the
         // table rows being indexed into the block cache, as the index maintenance code
-        // does a point scan per row
+        // does a point scan per row.
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
         List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
         Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
                 new HashMap<ImmutableBytesWritable, IndexMaintainer>();
@@ -55,10 +79,9 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         for (int i = 0; i < miniBatchOp.size(); i++) {
             Mutation m = miniBatchOp.getOperation(i);
             keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
-            List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap());
-
+            
             for(IndexMaintainer indexMaintainer: indexMaintainers) {
-                if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
+                if (indexMaintainer.isImmutableRows()) continue;
                 indexTableName.set(indexMaintainer.getIndexTableName());
                 if (maintainers.get(indexTableName) != null) continue;
                 maintainers.put(indexTableName, indexMaintainer);
@@ -67,10 +90,11 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
         }
         if (maintainers.isEmpty()) return;
         Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
+        scan.setRaw(true);
         ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
         scanRanges.initializeScan(scan);
         scan.setFilter(scanRanges.getSkipScanFilter());
-        Region region = this.env.getRegion();
+        Region region = env.getRegion();
         RegionScanner scanner = region.getScanner(scan);
         // Run through the scanner using internal nextRaw method
         region.startRegionOperation();
@@ -93,13 +117,4 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
             }
         }
     }
-
-    private PhoenixIndexCodec getCodec() {
-        return (PhoenixIndexCodec)this.codec;
-    }
-
-    @Override
-    public byte[] getBatchId(Mutation m){
-        return this.codec.getBatchId(m);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 222aefb..7acc90c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -1,196 +1,109 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.index;
 
 import java.io.IOException;
-import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
-import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.collect.Lists;
 
 /**
  * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index (
- * {@link #getIndexDeletes(TableState)}) as well as what the new index state should be (
- * {@link #getIndexUpserts(TableState)}).
+ * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be (
+ * {@link #getIndexUpserts(TableState, IndexMetaData)}).
  */
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
     public static final String INDEX_UUID = "IdxUUID";
+    public static final String INDEX_MAINTAINERS = "IndexMaintainers";
+    private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE;
 
     private RegionCoprocessorEnvironment env;
-    private KeyValueBuilder kvBuilder;
 
     @Override
-    public void initialize(RegionCoprocessorEnvironment env) {
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+        super.initialize(env);
         this.env = env;
-        Configuration conf = env.getConfiguration();
-        // Install handler that will attempt to disable the index first before killing the region
-        // server
-        conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
-            PhoenixIndexFailurePolicy.class.getName());
-        // Use the GenericKeyValueBuilder, as it's been shown in perf testing that ClientKeyValue doesn't help
-        // TODO: Jesse to investigate more
-        this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
 
-    List<IndexMaintainer> getIndexMaintainers(Map<String, byte[]> attributes) throws IOException{
-        if (attributes == null) {
-            return Collections.emptyList();
-        }
+    boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
+        if (attributes == null) { return false; }
         byte[] uuid = attributes.get(INDEX_UUID);
-        if (uuid == null) {
-            return Collections.emptyList();
-        }
-        byte[] md = attributes.get(INDEX_MD);
-        List<IndexMaintainer> indexMaintainers;
-        if (md != null) {
-            indexMaintainers = IndexMaintainer.deserialize(md);
-        } else {
-            byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
-            ImmutableBytesWritable tenantId =
-                tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
-            TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
-            IndexMetaDataCache indexCache =
-                (IndexMetaDataCache) cache.getServerCache(new ImmutableBytesPtr(uuid));
-            if (indexCache == null) {
-                String msg = "key="+ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
-                SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
-                    .setMessage(msg).build().buildException();
-                ServerUtil.throwIOException("Index update failed", e); // will not return
-            }
-            indexMaintainers = indexCache.getIndexMaintainers();
-        }
-    
-        return indexMaintainers;
-    }
-    
-    @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
-        return getIndexUpdates(state, true);
+        if (uuid == null) { return false; }
+        return true;
     }
 
     @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
-        return getIndexUpdates(state, false);
-    }
-
-    /**
-     * 
-     * @param state
-     * @param upsert prepare index upserts if it's true otherwise prepare index deletes. 
-     * @return
-     * @throws IOException
-     */
-    private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
-        List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes());
-        if (indexMaintainers.isEmpty()) {
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
             return Collections.emptyList();
         }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            ValueGetter valueGetter = statePair.getFirst();
+            IndexUpdate indexUpdate = statePair.getSecond();
+            indexUpdate.setTable(maintainer.getIndexTableName());
+            Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env
+                    .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
+            indexUpdate.setUpdate(put);
+            indexUpdates.add(indexUpdate);
+        }
+        return indexUpdates;
+    }
+
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
+        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
-        byte[] dataRowKey = state.getCurrentRowKey();
-        ptr.set(dataRowKey);
-        byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName());
-        ValueGetter valueGetter = null;
-        Scanner scanner = null;
+        ptr.set(state.getCurrentRowKey());
+        List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            if(upsert) {
-                // Short-circuit building state when we know it's a row deletion
-                if (maintainer.isRowDeleted(state.getPendingUpdate())) {
-                    continue;
-                }
-            }
-            IndexUpdate indexUpdate = null;
-            if (maintainer.isImmutableRows()) {
-                indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
-                if(maintainer.isLocalIndex()) {
-                    indexUpdate.setTable(localIndexTableName);
-                } else {
-                    indexUpdate.setTable(maintainer.getIndexTableName());
-                }
-                valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
-            } else {
-                // TODO: if more efficient, I could do this just once with all columns in all indexes
-                Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
-                scanner = statePair.getFirst();
-                indexUpdate = statePair.getSecond();
-                indexUpdate.setTable(maintainer.getIndexTableName());
-                valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
-            }
-            Mutation mutation = null;
-            if (upsert) {
-                mutation =
-                        maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(),
-                            env.getRegion().getRegionInfo().getStartKey(),
-                            env.getRegion().getRegionInfo().getEndKey());
-            } else {
-                mutation =
-                        maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
-                            state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(),
-                            env.getRegion().getRegionInfo().getEndKey());
-            }
-            indexUpdate.setUpdate(mutation);
-            if (scanner != null) {
-                scanner.close();
-                scanner = null;
-            }
+            // For transactional tables, we use an index maintainer
+            // to aid in rollback if there's a KeyValue column in the index. The alternative would be
+            // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
+            // client side.
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            ValueGetter valueGetter = statePair.getFirst();
+            IndexUpdate indexUpdate = statePair.getSecond();
+            indexUpdate.setTable(maintainer.getIndexTableName());
+            Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
+                    state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey());
+            indexUpdate.setUpdate(delete);
             indexUpdates.add(indexUpdate);
         }
         return indexUpdates;
     }
-    
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-      return !getIndexMaintainers(m.getAttributesMap()).isEmpty();
-  }
-  
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    Map<String, byte[]> attributes = m.getAttributesMap();
-    return attributes.get(INDEX_UUID);
-  }
+
+    @Override
+    public boolean isEnabled(Mutation m) throws IOException {
+        return hasIndexMaintainers(m.getAttributesMap());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
new file mode 100644
index 0000000..3dcc44e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.IndexMetaDataCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ServerUtil;
+
+public class PhoenixIndexMetaData implements IndexMetaData {
+    private final Map<String, byte[]> attributes;
+    private final IndexMetaDataCache indexMetaDataCache;
+    
+    private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
+        if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+        byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID);
+        if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
+        byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD);
+        if (md != null) {
+            final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md);
+            return new IndexMetaDataCache() {
+
+                @Override
+                public void close() throws IOException {}
+
+                @Override
+                public List<IndexMaintainer> getIndexMaintainers() {
+                    return indexMaintainers;
+                }
+
+            };
+        } else {
+            byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
+            ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
+            TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
+            IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
+            if (indexCache == null) {
+                String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
+                SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
+                        .setMessage(msg).build().buildException();
+                ServerUtil.throwIOException("Index update failed", e); // will not return
+            }
+            return indexCache;
+        }
+
+    }
+
+    public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
+        this.indexMetaDataCache = getIndexMetaData(env, attributes);
+        this.attributes = attributes;
+    }
+    
+    public List<IndexMaintainer> getIndexMaintainers() {
+        return indexMetaDataCache.getIndexMaintainers();
+    }
+
+    public Map<String, byte[]> getAttributes() {
+        return attributes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b718089..18a97bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -109,7 +109,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
@@ -683,7 +683,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     && !SchemaUtil.isStatsTable(tableName)
                     && !descriptor.hasCoprocessor(Indexer.class.getName())) {
                 Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
                 Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
             }
             if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
index 6fc480e..1f271f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.schema.tuple;
 
-import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
-
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -28,7 +26,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
  * 
@@ -56,13 +53,22 @@ public class ValueGetterTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
-    	ImmutableBytesPtr value = null;
+        ImmutableBytesWritable value = null;
         try {
             value = valueGetter.getLatestValue(new ColumnReference(family, qualifier));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-    	return new KeyValue(valueGetter.getRowKey(), family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value!=null? copyBytesIfNecessary(value) : null);
+        byte[] rowKey = valueGetter.getRowKey();
+        int valueOffset = 0;
+        int valueLength = 0;
+        byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY;
+        if (value != null) {
+            valueBytes = value.get();
+            valueOffset = value.getOffset();
+            valueLength = value.getLength();
+        }
+    	return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index cb63380..b0fae03 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -24,11 +24,10 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
+import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.index.BaseIndexCodec;
 
 /**
  * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless
@@ -53,12 +52,12 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec {
   }
   
   @Override
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+  public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
     return this.deletes;
   }
 
   @Override
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+  public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) {
     return this.updates;
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index 1b61ef0..fc3a976 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -32,26 +32,23 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec;
 import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec.ColumnEntry;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
 
 public class TestCoveredColumnIndexCodec {
   private static final byte[] PK = new byte[] { 'a' };
@@ -172,7 +169,7 @@ public class TestCoveredColumnIndexCodec {
     // start with a basic put that has some keyvalues
     Put p = new Put(PK);
     // setup the kvs to add
-    List<Cell> kvs = new ArrayList<Cell>();
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
     byte[] v1 = Bytes.toBytes("v1");
     KeyValue kv = new KeyValue(PK, FAMILY, QUAL, 1, v1);
     kvs.add(kv);
@@ -184,14 +181,14 @@ public class TestCoveredColumnIndexCodec {
 
     // check the codec for deletes it should send
     LocalTableState state = new LocalTableState(env, table, p);
-    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state);
+    Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
     assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next()
         .isValid());
 
     // get the updates with the pending update
     state.setCurrentTimestamp(1);
-    state.addPendingUpdates(KeyValueUtil.ensureKeyValues(kvs));
-    updates = codec.getIndexUpserts(state);
+    state.addPendingUpdates(kvs);
+    updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
     assertTrue("Didn't find index updates for pending primary table update!", updates.iterator()
         .hasNext());
     for (IndexUpdate update : updates) {
@@ -210,11 +207,11 @@ public class TestCoveredColumnIndexCodec {
     d.deleteFamily(FAMILY, 2);
     // setup the next batch of 'current state', basically just ripping out the current state from
     // the last round
-    table = new SimpleTableState(Result.create(kvs));
+    table = new SimpleTableState(new Result(kvs));
     state = new LocalTableState(env, table, d);
     state.setCurrentTimestamp(2);
     // check the cleanup of the current table, after the puts (mocking a 'next' update)
-    updates = codec.getIndexDeletes(state);
+    updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertTrue("Didn't have any index cleanup, even though there is current state",
         update.isValid());
@@ -237,14 +234,14 @@ public class TestCoveredColumnIndexCodec {
     ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d);
   }
 
-  private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<Cell> currentState,
+  private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<KeyValue> currentState,
       Delete d) throws IOException {
-    LocalHBaseState table = new SimpleTableState(Result.create(currentState));
+    LocalHBaseState table = new SimpleTableState(new Result(currentState));
     LocalTableState state = new LocalTableState(env, table, d);
     state.setCurrentTimestamp(d.getTimeStamp());
     // now we shouldn't see anything when getting the index update
-    state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY)));
-    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+    state.addPendingUpdates(d.getFamilyMap().get(FAMILY));
+    Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA);
     for (IndexUpdate update : updates) {
       assertFalse("Had some index updates, though it should have been covered by the delete",
         update.isValid());


Mime
View raw message