Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6617E181F1 for ; Wed, 11 Nov 2015 23:05:12 +0000 (UTC) Received: (qmail 14968 invoked by uid 500); 11 Nov 2015 23:05:12 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 14843 invoked by uid 500); 11 Nov 2015 23:05:12 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 14694 invoked by uid 99); 11 Nov 2015 23:05:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 23:05:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A202EE5705; Wed, 11 Nov 2015 23:05:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tdsilva@apache.org To: commits@phoenix.apache.org Date: Wed, 11 Nov 2015 23:05:17 -0000 Message-Id: <371321f8914f49539c95d7d4d91fb851@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/10] phoenix git commit: PHOENIX-2395 Refactor mutable secondary indexes code http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java index 5964647..3335aaa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java @@ -22,16 +22,14 @@ import java.io.IOException; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import com.google.common.collect.Multimap; -import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; - /** * Handle failures to write to the index tables. */ public interface IndexFailurePolicy extends Stoppable { - public void setup(Stoppable parent, RegionCoprocessorEnvironment env); /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java deleted file mode 100644 index 1c45cd3..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/BaseIndexCodec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.index; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - -import org.apache.phoenix.hbase.index.covered.IndexCodec; - -/** - * - */ -public abstract class BaseIndexCodec implements IndexCodec { - - @Override - public void initialize(RegionCoprocessorEnvironment env) throws IOException { - // noop - } - - /** - * {@inheritDoc} - *

- * By default, the codec is always enabled. Subclasses should override this method if they want do - * decide to index on a per-mutation basis. - * @throws IOException - */ - @Override - public boolean isEnabled(Mutation m) throws IOException { - return true; - } - - /** - * {@inheritDoc} - *

- * Assumes each mutation is not in a batch. Subclasses that have different batching behavior - * should override this. - */ - @Override - public byte[] getBatchId(Mutation m) { - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index b060345..dea82a8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -36,6 +36,7 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Delete; @@ -199,6 +200,40 @@ public class IndexMaintainer implements Writable, Iterable { ptr.set(stream.getBuffer(), 0, stream.size()); } + + public static void serializeAdditional(PTable table, ImmutableBytesWritable indexMetaDataPtr, + List keyValueIndexes, PhoenixConnection connection) { + int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : ByteUtil.vintFromBytes(indexMetaDataPtr); + int nIndexes = nMutableIndexes + keyValueIndexes.size(); + int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case new size increases buffer + if (indexMetaDataPtr.getLength() == 0) { + estimatedSize += table.getRowKeySchema().getEstimatedByteSize(); + } + for (PTable index : keyValueIndexes) { + estimatedSize += index.getIndexMaintainer(table, connection).getEstimatedByteSize(); + } + TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(estimatedSize + 1); + DataOutput output = new DataOutputStream(stream); + try { + // Encode data table salting in sign of number of indexes + WritableUtils.writeVInt(output, nIndexes * (table.getBucketNum() == null ? 1 : -1)); + // Serialize current mutable indexes, subtracting the vint size from the length + // as its still included + if (indexMetaDataPtr.getLength() > 0) { + output.write(indexMetaDataPtr.get(), indexMetaDataPtr.getOffset(), indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes)); + } else { + table.getRowKeySchema().write(output); + } + // Serialize mutable indexes afterwards + for (PTable index : keyValueIndexes) { + index.getIndexMaintainer(table, connection).write(output); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size()); + } + public static List deserialize(ImmutableBytesWritable metaDataPtr, KeyValueBuilder builder) { return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), metaDataPtr.getLength()); @@ -270,6 +305,7 @@ public class IndexMaintainer implements Writable, Iterable { private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection connection) { this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null); + assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() == PTableType.TABLE || dataTable.getType() == PTableType.VIEW); this.rowKeyOrderOptimizable = index.rowKeyOrderOptimizable(); this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); @@ -806,13 +842,14 @@ public class IndexMaintainer implements Writable, Iterable { // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, - ByteUtil.EMPTY_BYTE_ARRAY_PTR)); + // set the value to the empty column name + QueryConstants.EMPTY_COLUMN_BYTES_PTR)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } int i = 0; for (ColumnReference ref : this.getCoverededColumns()) { ImmutableBytesPtr cq = this.indexQualifiers.get(i++); - ImmutableBytesPtr value = valueGetter.getLatestValue(ref); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); if (value != null) { @@ -827,29 +864,51 @@ public class IndexMaintainer implements Writable, Iterable { return put; } - public boolean isRowDeleted(Collection pendingUpdates) { + private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS}; + private DeleteType getDeleteTypeOrNull(Collection pendingUpdates) { int nDeleteCF = 0; + int nDeleteVersionCF = 0; for (KeyValue kv : pendingUpdates) { - if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { - nDeleteCF++; + if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + nDeleteVersionCF++; } + else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() + // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor + ) { + nDeleteCF++; + } } - return nDeleteCF == this.nDataCFs && nDeleteCF > 0; + // This is what a delete looks like on the server side for mutable indexing... + // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not + DeleteType deleteType = null; + if (nDeleteVersionCF > 0 && nDeleteVersionCF >= this.nDataCFs) { + deleteType = DeleteType.SINGLE_VERSION; + } else { + int nDelete = nDeleteCF + nDeleteVersionCF; + if (nDelete>0 && nDelete >= this.nDataCFs) { + deleteType = DeleteType.ALL_VERSIONS; + } + } + return deleteType; + } + + public boolean isRowDeleted(Collection pendingUpdates) { + return getDeleteTypeOrNull(pendingUpdates) != null; } private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection pendingUpdates) throws IOException { if (pendingUpdates.isEmpty()) { return false; } - Map newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); - for (KeyValue kv : pendingUpdates) { + Map newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); + for (Cell kv : pendingUpdates) { newState.put(new ColumnReference(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)), kv); } for (ColumnReference ref : indexedColumns) { - KeyValue newValue = newState.get(ref); + Cell newValue = newState.get(ref); if (newValue != null) { // Indexed column has potentially changed - ImmutableBytesPtr oldValue = oldState.getLatestValue(ref); - boolean newValueSetAsNull = newValue.getTypeByte() == Type.DeleteColumn.getCode(); + ImmutableBytesWritable oldValue = oldState.getLatestValue(ref); + boolean newValueSetAsNull = (newValue.getTypeByte() == Type.DeleteColumn.getCode() || newValue.getTypeByte() == Type.Delete.getCode() || CellUtil.matchingValue(newValue, HConstants.EMPTY_BYTE_ARRAY)); //If the new column value has to be set as null and the older value is null too, //then just skip to the next indexed column. if (newValueSetAsNull && oldValue == null) { @@ -880,14 +939,28 @@ public class IndexMaintainer implements Writable, Iterable { public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); // Delete the entire row if any of the indexed columns changed - if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row - Delete delete = new Delete(indexRowKey, ts); + DeleteType deleteType = null; + if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row + byte[] emptyCF = emptyKeyValueCFPtr.copyBytesIfNecessary(); + Delete delete = new Delete(indexRowKey); + // If table delete was single version, then index delete should be as well + if (deleteType == DeleteType.SINGLE_VERSION) { + for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set for index CFs? + delete.deleteFamilyVersion(ref.getFamily(), ts); + } + delete.deleteFamilyVersion(emptyCF, ts); + } else { + for (ColumnReference ref : getCoverededColumns()) { // FIXME: Keep Set for index CFs? + delete.deleteFamily(ref.getFamily(), ts); + } + delete.deleteFamily(emptyCF, ts); + } delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); return delete; } Delete delete = null; // Delete columns for missing key values - for (KeyValue kv : pendingUpdates) { + for (Cell kv : pendingUpdates) { if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) { ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier()); if (coveredColumns.contains(ref)) { @@ -895,7 +968,12 @@ public class IndexMaintainer implements Writable, Iterable { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + // If point delete for data table, then use point delete for index as well + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + delete.deleteColumn(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + } else { + delete.deleteColumns(ref.getFamily(), IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + } } } } @@ -1354,14 +1432,12 @@ public class IndexMaintainer implements Writable, Iterable { .size()); for (Cell kv : pendingUpdates) { // create new pointers to each part of the kv - ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength()); - ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength()); ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); - valueMap.put(new ColumnReference(kv.getRowArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value); + valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value); } return new ValueGetter() { @Override - public ImmutableBytesPtr getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref) { if(ref.equals(dataEmptyKeyValueRef)) return null; return valueMap.get(ref); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index 3ef01fe..0601e0a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -23,16 +23,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; @@ -41,13 +45,33 @@ import com.google.common.collect.Lists; /** * Index builder for covered-columns index that ties into phoenix for faster use. */ -public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { +public class PhoenixIndexBuilder extends NonTxIndexBuilder { @Override - public void batchStarted(MiniBatchOperationInProgress miniBatchOp) throws IOException { + public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress miniBatchOp) throws IOException { + return new PhoenixIndexMetaData(env, miniBatchOp.getOperation(0).getAttributesMap()); + } + + protected PhoenixIndexCodec getCodec() { + return (PhoenixIndexCodec)codec; + } + + @Override + public void setup(RegionCoprocessorEnvironment env) throws IOException { + super.setup(env); + Configuration conf = env.getConfiguration(); + // Install handler that will attempt to disable the index first before killing the region + // server + conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, + PhoenixIndexFailurePolicy.class.getName()); + } + + @Override + public void batchStarted(MiniBatchOperationInProgress miniBatchOp, IndexMetaData context) throws IOException { // The entire purpose of this method impl is to get the existing rows for the // table rows being indexed into the block cache, as the index maintenance code - // does a point scan per row + // does a point scan per row. + List indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); List keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); Map maintainers = new HashMap(); @@ -55,10 +79,9 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); - List indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap()); - + for(IndexMaintainer indexMaintainer: indexMaintainers) { - if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; + if (indexMaintainer.isImmutableRows()) continue; indexTableName.set(indexMaintainer.getIndexTableName()); if (maintainers.get(indexTableName) != null) continue; maintainers.put(indexTableName, indexMaintainer); @@ -67,10 +90,11 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { } if (maintainers.isEmpty()) return; Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList(maintainers.values())); + scan.setRaw(true); ScanRanges scanRanges = ScanRanges.createPointLookup(keys); scanRanges.initializeScan(scan); scan.setFilter(scanRanges.getSkipScanFilter()); - Region region = this.env.getRegion(); + Region region = env.getRegion(); RegionScanner scanner = region.getScanner(scan); // Run through the scanner using internal nextRaw method region.startRegionOperation(); @@ -93,13 +117,4 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { } } } - - private PhoenixIndexCodec getCodec() { - return (PhoenixIndexCodec)this.codec; - } - - @Override - public byte[] getBatchId(Mutation m){ - return this.codec.getBatchId(m); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 222aefb..7acc90c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -1,196 +1,109 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. */ package org.apache.phoenix.index; import java.io.IOException; -import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.cache.GlobalCache; -import org.apache.phoenix.cache.IndexMetaDataCache; -import org.apache.phoenix.cache.ServerCacheClient; -import org.apache.phoenix.cache.TenantCache; -import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; import org.apache.phoenix.hbase.index.covered.IndexCodec; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; -import org.apache.phoenix.hbase.index.scanner.Scanner; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; -import org.apache.phoenix.hbase.index.write.IndexWriter; -import org.apache.phoenix.util.MetaDataUtil; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.ServerUtil; import com.google.common.collect.Lists; /** * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index ( - * {@link #getIndexDeletes(TableState)}) as well as what the new index state should be ( - * {@link #getIndexUpserts(TableState)}). + * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be ( + * {@link #getIndexUpserts(TableState, IndexMetaData)}). */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; public static final String INDEX_UUID = "IdxUUID"; + public static final String INDEX_MAINTAINERS = "IndexMaintainers"; + private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; private RegionCoprocessorEnvironment env; - private KeyValueBuilder kvBuilder; @Override - public void initialize(RegionCoprocessorEnvironment env) { + public void initialize(RegionCoprocessorEnvironment env) throws IOException { + super.initialize(env); this.env = env; - Configuration conf = env.getConfiguration(); - // Install handler that will attempt to disable the index first before killing the region - // server - conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, - PhoenixIndexFailurePolicy.class.getName()); - // Use the GenericKeyValueBuilder, as it's been shown in perf testing that ClientKeyValue doesn't help - // TODO: Jesse to investigate more - this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } - List getIndexMaintainers(Map attributes) throws IOException{ - if (attributes == null) { - return Collections.emptyList(); - } + boolean hasIndexMaintainers(Map attributes) { + if (attributes == null) { return false; } byte[] uuid = attributes.get(INDEX_UUID); - if (uuid == null) { - return Collections.emptyList(); - } - byte[] md = attributes.get(INDEX_MD); - List indexMaintainers; - if (md != null) { - indexMaintainers = IndexMaintainer.deserialize(md); - } else { - byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); - ImmutableBytesWritable tenantId = - tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); - TenantCache cache = GlobalCache.getTenantCache(env, tenantId); - IndexMetaDataCache indexCache = - (IndexMetaDataCache) cache.getServerCache(new ImmutableBytesPtr(uuid)); - if (indexCache == null) { - String msg = "key="+ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); - SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) - .setMessage(msg).build().buildException(); - ServerUtil.throwIOException("Index update failed", e); // will not return - } - indexMaintainers = indexCache.getIndexMaintainers(); - } - - return indexMaintainers; - } - - @Override - public Iterable getIndexUpserts(TableState state) throws IOException { - return getIndexUpdates(state, true); + if (uuid == null) { return false; } + return true; } @Override - public Iterable getIndexDeletes(TableState state) throws IOException { - return getIndexUpdates(state, false); - } - - /** - * - * @param state - * @param upsert prepare index upserts if it's true otherwise prepare index deletes. - * @return - * @throws IOException - */ - private Iterable getIndexUpdates(TableState state, boolean upsert) throws IOException { - List indexMaintainers = getIndexMaintainers(state.getUpdateAttributes()); - if (indexMaintainers.isEmpty()) { + public Iterable getIndexUpserts(TableState state, IndexMetaData context) throws IOException { + List indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); + if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { return Collections.emptyList(); } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); List indexUpdates = Lists.newArrayList(); + for (IndexMaintainer maintainer : indexMaintainers) { + Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env + .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + indexUpdate.setUpdate(put); + indexUpdates.add(indexUpdate); + } + return indexUpdates; + } + + @Override + public Iterable getIndexDeletes(TableState state, IndexMetaData context) throws IOException { + List indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy - byte[] dataRowKey = state.getCurrentRowKey(); - ptr.set(dataRowKey); - byte[] localIndexTableName = MetaDataUtil.getLocalIndexPhysicalName(env.getRegion().getTableDesc().getName()); - ValueGetter valueGetter = null; - Scanner scanner = null; + ptr.set(state.getCurrentRowKey()); + List indexUpdates = Lists.newArrayList(); for (IndexMaintainer maintainer : indexMaintainers) { - if(upsert) { - // Short-circuit building state when we know it's a row deletion - if (maintainer.isRowDeleted(state.getPendingUpdate())) { - continue; - } - } - IndexUpdate indexUpdate = null; - if (maintainer.isImmutableRows()) { - indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns())); - if(maintainer.isLocalIndex()) { - indexUpdate.setTable(localIndexTableName); - } else { - indexUpdate.setTable(maintainer.getIndexTableName()); - } - valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate()); - } else { - // TODO: if more efficient, I could do this just once with all columns in all indexes - Pair statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns()); - scanner = statePair.getFirst(); - indexUpdate = statePair.getSecond(); - indexUpdate.setTable(maintainer.getIndexTableName()); - valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey); - } - Mutation mutation = null; - if (upsert) { - mutation = - maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), - env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); - } else { - mutation = - maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(), - state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); - } - indexUpdate.setUpdate(mutation); - if (scanner != null) { - scanner.close(); - scanner = null; - } + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + Pair statePair = state.getIndexUpdateState(maintainer.getAllColumns()); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.getIndexTableName()); + Delete delete = maintainer.buildDeleteMutation(KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), + state.getCurrentTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + indexUpdate.setUpdate(delete); indexUpdates.add(indexUpdate); } return indexUpdates; } - - @Override - public boolean isEnabled(Mutation m) throws IOException { - return !getIndexMaintainers(m.getAttributesMap()).isEmpty(); - } - - @Override - public byte[] getBatchId(Mutation m) { - Map attributes = m.getAttributesMap(); - return attributes.get(INDEX_UUID); - } + + @Override + public boolean isEnabled(Mutation m) throws IOException { + return hasIndexMaintainers(m.getAttributesMap()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java new file mode 100644 index 0000000..3dcc44e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.IndexMetaDataCache; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ServerUtil; + +public class PhoenixIndexMetaData implements IndexMetaData { + private final Map attributes; + private final IndexMetaDataCache indexMetaDataCache; + + private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map attributes) throws IOException { + if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } + byte[] uuid = attributes.get(PhoenixIndexCodec.INDEX_UUID); + if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } + byte[] md = attributes.get(PhoenixIndexCodec.INDEX_MD); + if (md != null) { + final List indexMaintainers = IndexMaintainer.deserialize(md); + return new IndexMetaDataCache() { + + @Override + public void close() throws IOException {} + + @Override + public List getIndexMaintainers() { + return indexMaintainers; + } + + }; + } else { + byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); + ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes); + TenantCache cache = GlobalCache.getTenantCache(env, tenantId); + IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid)); + if (indexCache == null) { + String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion(); + SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND) + .setMessage(msg).build().buildException(); + ServerUtil.throwIOException("Index update failed", e); // will not return + } + return indexCache; + } + + } + + public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map attributes) throws IOException { + this.indexMetaDataCache = getIndexMetaData(env, attributes); + this.attributes = attributes; + } + + public List getIndexMaintainers() { + return indexMetaDataCache.getIndexMaintainers(); + } + + public Map getAttributes() { + return attributes; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b718089..18a97bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -109,7 +109,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy; import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.hbase.index.util.VersionUtil; @@ -683,7 +683,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(Indexer.class.getName())) { Map opts = Maps.newHashMapWithExpectedSize(1); - opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); } if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java index 6fc480e..1f271f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.schema.tuple; -import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary; - import java.io.IOException; import org.apache.hadoop.hbase.HConstants; @@ -28,7 +26,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** * @@ -56,13 +53,22 @@ public class ValueGetterTuple extends BaseTuple { @Override public KeyValue getValue(byte[] family, byte[] qualifier) { - ImmutableBytesPtr value = null; + ImmutableBytesWritable value = null; try { value = valueGetter.getLatestValue(new ColumnReference(family, qualifier)); } catch (IOException e) { throw new RuntimeException(e); } - return new KeyValue(valueGetter.getRowKey(), family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value!=null? copyBytesIfNecessary(value) : null); + byte[] rowKey = valueGetter.getRowKey(); + int valueOffset = 0; + int valueLength = 0; + byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY; + if (value != null) { + valueBytes = value.get(); + valueOffset = value.getOffset(); + valueLength = value.getLength(); + } + return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java index cb63380..b0fae03 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java @@ -24,11 +24,10 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.TableState; -import org.apache.phoenix.index.BaseIndexCodec; /** * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless @@ -53,12 +52,12 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec { } @Override - public Iterable getIndexDeletes(TableState state) { + public Iterable getIndexDeletes(TableState state, IndexMetaData context) { return this.deletes; } @Override - public Iterable getIndexUpserts(TableState state) { + public Iterable getIndexUpserts(TableState state, IndexMetaData context) { return this.updates; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ef2fa434/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java index 1b61ef0..fc3a976 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java @@ -32,26 +32,23 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.mockito.Mockito; - -import com.google.common.collect.Lists; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; -import org.apache.phoenix.hbase.index.covered.example.ColumnGroup; -import org.apache.phoenix.hbase.index.covered.example.CoveredColumn; -import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec; import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec.ColumnEntry; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; public class TestCoveredColumnIndexCodec { private static final byte[] PK = new byte[] { 'a' }; @@ -172,7 +169,7 @@ public class TestCoveredColumnIndexCodec { // start with a basic put that has some keyvalues Put p = new Put(PK); // setup the kvs to add - List kvs = new ArrayList(); + List kvs = new ArrayList(); byte[] v1 = Bytes.toBytes("v1"); KeyValue kv = new KeyValue(PK, FAMILY, QUAL, 1, v1); kvs.add(kv); @@ -184,14 +181,14 @@ public class TestCoveredColumnIndexCodec { // check the codec for deletes it should send LocalTableState state = new LocalTableState(env, table, p); - Iterable updates = codec.getIndexDeletes(state); + Iterable updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next() .isValid()); // get the updates with the pending update state.setCurrentTimestamp(1); - state.addPendingUpdates(KeyValueUtil.ensureKeyValues(kvs)); - updates = codec.getIndexUpserts(state); + state.addPendingUpdates(kvs); + updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); assertTrue("Didn't find index updates for pending primary table update!", updates.iterator() .hasNext()); for (IndexUpdate update : updates) { @@ -210,11 +207,11 @@ public class TestCoveredColumnIndexCodec { d.deleteFamily(FAMILY, 2); // setup the next batch of 'current state', basically just ripping out the current state from // the last round - table = new SimpleTableState(Result.create(kvs)); + table = new SimpleTableState(new Result(kvs)); state = new LocalTableState(env, table, d); state.setCurrentTimestamp(2); // check the cleanup of the current table, after the puts (mocking a 'next' update) - updates = codec.getIndexDeletes(state); + updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); for (IndexUpdate update : updates) { assertTrue("Didn't have any index cleanup, even though there is current state", update.isValid()); @@ -237,14 +234,14 @@ public class TestCoveredColumnIndexCodec { ensureNoUpdatesWhenCoveredByDelete(env, codec, kvs, d); } - private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List currentState, + private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List currentState, Delete d) throws IOException { - LocalHBaseState table = new SimpleTableState(Result.create(currentState)); + LocalHBaseState table = new SimpleTableState(new Result(currentState)); LocalTableState state = new LocalTableState(env, table, d); state.setCurrentTimestamp(d.getTimeStamp()); // now we shouldn't see anything when getting the index update - state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY))); - Iterable updates = codec.getIndexUpserts(state); + state.addPendingUpdates(d.getFamilyMap().get(FAMILY)); + Iterable updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); for (IndexUpdate update : updates) { assertFalse("Had some index updates, though it should have been covered by the delete", update.isValid());