Return-Path: X-Original-To: apmail-phoenix-commits-archive@minotaur.apache.org Delivered-To: apmail-phoenix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1DFB4109C9 for ; Sat, 8 Feb 2014 07:43:53 +0000 (UTC) Received: (qmail 93449 invoked by uid 500); 8 Feb 2014 07:43:51 -0000 Delivered-To: apmail-phoenix-commits-archive@phoenix.apache.org Received: (qmail 93390 invoked by uid 500); 8 Feb 2014 07:43:50 -0000 Mailing-List: contact commits-help@phoenix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.incubator.apache.org Delivered-To: mailing list commits@phoenix.incubator.apache.org Received: (qmail 93310 invoked by uid 99); 8 Feb 2014 07:43:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Feb 2014 07:43:47 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sat, 08 Feb 2014 07:43:16 +0000 Received: (qmail 92422 invoked by uid 99); 8 Feb 2014 07:42:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Feb 2014 07:42:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 042989202E5; Sat, 8 Feb 2014 07:42:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jeffreyz@apache.org To: commits@phoenix.incubator.apache.org Date: Sat, 08 Feb 2014 07:43:00 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/11] git commit: Port Phoenix to Hbase0.98 X-Virus-Checked: Checked by ClamAV on apache.org Port Phoenix to Hbase0.98 Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/53f7d3ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/53f7d3ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/53f7d3ce Branch: refs/heads/4.0.0 Commit: 53f7d3ce880f79b18a5a728be13f965e84c52e56 Parents: 214ad9e Author: Jeffrey Zhong Authored: Fri Feb 7 23:41:57 2014 -0800 Committer: Jeffrey Zhong Committed: Fri Feb 7 23:43:34 2014 -0800 ---------------------------------------------------------------------- phoenix-core/pom.xml | 121 +- .../hbase/index/IndexLogRollSynchronizer.java | 14 +- .../org/apache/hadoop/hbase/index/Indexer.java | 95 +- .../hbase/index/builder/BaseIndexBuilder.java | 4 +- .../hbase/index/builder/IndexBuildManager.java | 6 +- .../hbase/index/builder/IndexBuilder.java | 4 +- .../covered/CoveredColumnsIndexBuilder.java | 21 +- .../hbase/index/covered/LocalTableState.java | 15 +- .../hbase/index/covered/data/IndexMemStore.java | 18 +- .../hbase/index/covered/data/LocalTable.java | 5 +- .../example/CoveredColumnIndexCodec.java | 1 - .../covered/example/CoveredColumnIndexer.java | 11 +- .../filter/ApplyAndFilterDeletesFilter.java | 36 +- ...olumnTrackingNextLargestTimestampFilter.java | 16 +- .../index/covered/filter/FamilyOnlyFilter.java | 8 +- .../covered/filter/MaxTimestampFilter.java | 22 +- .../covered/filter/NewerTimestampFilter.java | 12 +- .../covered/update/IndexUpdateManager.java | 11 +- .../hbase/index/parallel/ThreadPoolManager.java | 1 - .../index/scanner/FilteredKeyValueScanner.java | 2 - .../index/table/CoprocessorHTableFactory.java | 3 +- .../hbase/index/util/IndexManagementUtil.java | 7 +- .../hadoop/hbase/index/wal/IndexedKeyValue.java | 73 +- .../hadoop/hbase/index/wal/KeyValueCodec.java | 11 +- .../regionserver/wal/IndexedHLogReader.java | 119 +- .../hbase/regionserver/wal/IndexedWALEdit.java | 91 - .../regionserver/wal/IndexedWALEditCodec.java | 40 +- .../apache/phoenix/cache/ServerCacheClient.java | 73 +- .../cache/aggcache/SpillableGroupByCache.java | 8 +- .../apache/phoenix/client/ClientKeyValue.java | 133 +- .../apache/phoenix/compile/DeleteCompiler.java | 5 +- .../apache/phoenix/compile/UpsertCompiler.java | 5 +- .../phoenix/coprocessor/BaseRegionScanner.java | 17 +- .../GroupedAggregateRegionObserver.java | 26 +- .../coprocessor/HashJoinRegionScanner.java | 47 +- .../coprocessor/MetaDataEndpointImpl.java | 1352 ++-- .../phoenix/coprocessor/MetaDataProtocol.java | 167 +- .../phoenix/coprocessor/ScanRegionObserver.java | 47 +- .../coprocessor/SequenceRegionObserver.java | 86 +- .../coprocessor/ServerCachingEndpointImpl.java | 98 +- .../coprocessor/ServerCachingProtocol.java | 3 +- .../UngroupedAggregateRegionObserver.java | 95 +- .../coprocessor/generated/MetaDataProtos.java | 7135 ++++++++++++++++++ .../coprocessor/generated/PTableProtos.java | 5315 +++++++++++++ .../generated/ServerCacheFactoryProtos.java | 568 ++ .../generated/ServerCachingProtos.java | 3447 +++++++++ .../apache/phoenix/execute/MutationState.java | 10 +- .../expression/KeyValueColumnExpression.java | 1 - .../DistinctValueWithCountServerAggregator.java | 4 +- .../phoenix/filter/BooleanExpressionFilter.java | 11 +- .../MultiCFCQKeyValueComparisonFilter.java | 11 + .../filter/MultiCQKeyValueComparisonFilter.java | 12 + .../filter/MultiKeyValueComparisonFilter.java | 8 +- .../phoenix/filter/RowKeyComparisonFilter.java | 15 +- .../SingleCFCQKeyValueComparisonFilter.java | 13 +- .../SingleCQKeyValueComparisonFilter.java | 11 + .../filter/SingleKeyValueComparisonFilter.java | 11 +- .../apache/phoenix/filter/SkipScanFilter.java | 25 +- .../apache/phoenix/index/IndexMaintainer.java | 41 +- .../phoenix/index/PhoenixIndexBuilder.java | 10 +- .../index/PhoenixIndexFailurePolicy.java | 44 +- .../iterate/MappedByteBufferSortedQueue.java | 5 +- .../iterate/RegionScannerResultIterator.java | 6 +- .../phoenix/iterate/SpoolingResultIterator.java | 4 +- .../java/org/apache/phoenix/job/JobManager.java | 10 +- .../apache/phoenix/join/HashCacheFactory.java | 2 +- .../org/apache/phoenix/join/ScanProjector.java | 5 +- .../phoenix/map/reduce/CSVBulkLoader.java | 3 - .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../apache/phoenix/protobuf/ProtobufUtil.java | 133 + .../query/ConnectionQueryServicesImpl.java | 260 +- .../query/ConnectionlessQueryServicesImpl.java | 12 +- .../phoenix/query/HConnectionFactory.java | 7 +- .../phoenix/query/QueryServicesOptions.java | 4 +- .../apache/phoenix/schema/DelegateColumn.java | 10 - .../apache/phoenix/schema/MetaDataClient.java | 4 +- .../java/org/apache/phoenix/schema/PColumn.java | 4 +- .../org/apache/phoenix/schema/PColumnImpl.java | 98 +- .../java/org/apache/phoenix/schema/PTable.java | 2 +- .../org/apache/phoenix/schema/PTableImpl.java | 286 +- .../org/apache/phoenix/schema/Sequence.java | 73 +- .../apache/phoenix/schema/stat/PTableStats.java | 3 +- .../phoenix/schema/stat/PTableStatsImpl.java | 20 +- .../schema/tuple/MultiKeyValueTuple.java | 19 +- .../org/apache/phoenix/schema/tuple/Tuple.java | 6 +- .../java/org/apache/phoenix/util/CSVLoader.java | 1 - .../java/org/apache/phoenix/util/IndexUtil.java | 22 +- .../org/apache/phoenix/util/KeyValueUtil.java | 39 +- .../org/apache/phoenix/util/MetaDataUtil.java | 23 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 7 +- .../org/apache/phoenix/util/ResultUtil.java | 29 +- .../org/apache/phoenix/util/SchemaUtil.java | 2 - .../org/apache/phoenix/util/ServerUtil.java | 16 +- .../java/org/apache/phoenix/util/TupleUtil.java | 13 +- .../hadoop/hbase/index/IndexTestingUtils.java | 4 +- .../TestFailForUnsupportedHBaseVersions.java | 2 + .../TestEndToEndCoveredColumnsIndexBuilder.java | 26 +- .../index/covered/TestLocalTableState.java | 13 +- .../index/covered/data/TestIndexMemStore.java | 4 +- .../example/TestCoveredColumnIndexCodec.java | 16 +- .../example/TestEndToEndCoveredIndexing.java | 2 + .../TestEndtoEndIndexingWithCompression.java | 7 +- .../covered/example/TestFailWithoutRetries.java | 2 + .../filter/TestApplyAndFilterDeletesFilter.java | 8 +- .../index/util/TestIndexManagementUtil.java | 6 +- .../index/write/TestWALRecoveryCaching.java | 7 +- .../recovery/TestPerRegionIndexWriteCache.java | 87 +- .../wal/TestReadWriteKeyValuesWithCodec.java | 48 +- ...ALReplayWithIndexWritesAndCompressedWAL.java | 28 +- ...exWritesAndUncompressedWALInHBase_094_9.java | 2 + .../phoenix/client/TestClientKeyValueLocal.java | 5 +- .../apache/phoenix/end2end/AlterTableTest.java | 4 - .../phoenix/end2end/NativeHBaseTypesTest.java | 2 +- .../phoenix/end2end/index/IndexTestUtil.java | 14 +- .../phoenix/filter/SkipScanFilterTest.java | 11 +- .../iterate/AggregateResultScannerTest.java | 7 - .../java/org/apache/phoenix/query/BaseTest.java | 2 + .../java/org/apache/phoenix/util/TestUtil.java | 36 +- phoenix-flume/pom.xml | 47 +- phoenix-protocol/README.txt | 10 + phoenix-protocol/src/main/MetaDataService.proto | 118 + phoenix-protocol/src/main/PTable.proto | 72 + .../src/main/ServerCacheFactory.proto | 29 + .../src/main/ServerCachingService.proto | 61 + phoenix-protocol/src/main/build-proto.sh | 37 + pom.xml | 111 +- 126 files changed, 19574 insertions(+), 2005 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 0e2b8e5..5cc7804 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -213,23 +213,11 @@ com.google.guava guava - - org.apache.hbase - hbase - net.sourceforge.findbugs annotations - - - - org.apache.hbase - hbase - test-jar - - org.codehaus.jackson jackson-core-asl @@ -249,12 +237,37 @@ junit junit - test org.mockito mockito-all + + com.google.protobuf + protobuf-java + ${protobuf-java.version} + + + org.apache.httpcomponents + httpclient + 4.0.1 + + + org.jruby + jruby-complete + ${jruby.version} + + + log4j + log4j + ${log4j.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + @@ -268,14 +281,60 @@ - - org.apache.hadoop - hadoop-core - - - org.apache.hadoop - hadoop-test - + + org.apache.hbase + hbase-testing-util + ${hbase-hadoop1.version} + + + org.apache.hbase + hbase-common + ${hbase-hadoop1.version} + + + org.apache.hbase + hbase-protocol + ${hbase-hadoop1.version} + + + org.apache.hbase + hbase-client + ${hbase-hadoop1.version} + + + org.apache.hadoop + hadoop-core + ${hadoop-one.version} + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + + + org.apache.hadoop + hadoop-test + ${hadoop-one.version} + true + test + @@ -290,6 +349,26 @@ + org.apache.hbase + hbase-testing-util + ${hbase-hadoop2.version} + + + org.apache.hbase + hbase-common + ${hbase-hadoop2.version} + + + org.apache.hbase + hbase-protocol + ${hbase-hadoop2.version} + + + org.apache.hbase + hbase-client + ${hbase-hadoop2.version} + + org.apache.hadoop hadoop-common http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java index ca61221..904612f 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/IndexLogRollSynchronizer.java @@ -71,6 +71,7 @@ public class IndexLogRollSynchronizer implements WALActionsListener { private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class); private WriteLock logArchiveLock; + private boolean lockAcquired = false; public IndexLogRollSynchronizer(WriteLock logWriteLock){ this.logArchiveLock = logWriteLock; @@ -81,12 +82,21 @@ public class IndexLogRollSynchronizer implements WALActionsListener { public void preLogArchive(Path oldPath, Path newPath) throws IOException { //take a write lock on the index - any pending index updates will complete before we finish LOG.debug("Taking INDEX_UPDATE writelock"); - logArchiveLock.lock(); - LOG.debug("Got the INDEX_UPDATE writelock"); + try { + logArchiveLock.lockInterruptibly(); + lockAcquired = true; + } catch (InterruptedException e) { + LOG.info("Acquiring lock got interrupted!"); + Thread.currentThread().interrupt(); + } + if (lockAcquired) { + LOG.debug("Got the INDEX_UPDATE writelock"); + } } @Override public void postLogArchive(Path oldPath, Path newPath) throws IOException { + if (!lockAcquired) return; // done archiving the logs, any WAL updates will be replayed on failure LOG.debug("Releasing INDEX_UPDATE writelock"); logArchiveLock.unlock(); http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java index fe2852b..aa9df58 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/Indexer.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; @@ -167,7 +168,7 @@ public class Indexer extends BaseRegionObserver { this.builder = new IndexBuildManager(env); // get a reference to the WAL - log = env.getRegionServerServices().getWAL(); + log = env.getRegionServerServices().getWAL(null); // add a synchronizer so we don't archive a WAL that we need log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock())); @@ -218,9 +219,9 @@ public class Indexer extends BaseRegionObserver { @Override public void prePut(final ObserverContext c, final Put put, - final WALEdit edit, final boolean writeToWAL) throws IOException { + final WALEdit edit, final Durability durability) throws IOException { if (this.disabled) { - super.prePut(c, put, edit, writeToWAL); + super.prePut(c, put, edit, durability); return; } // just have to add a batch marker to the WALEdit so we get the edit again in the batch @@ -230,13 +231,13 @@ public class Indexer extends BaseRegionObserver { @Override public void preDelete(ObserverContext e, Delete delete, - WALEdit edit, boolean writeToWAL) throws IOException { + WALEdit edit, final Durability durability) throws IOException { if (this.disabled) { - super.preDelete(e, delete, edit, writeToWAL); + super.preDelete(e, delete, edit, durability); return; } try { - preDeleteWithExceptions(e, delete, edit, writeToWAL); + preDeleteWithExceptions(e, delete, edit, durability); return; } catch (Throwable t) { rethrowIndexingException(t); @@ -246,7 +247,7 @@ public class Indexer extends BaseRegionObserver { } public void preDeleteWithExceptions(ObserverContext e, - Delete delete, WALEdit edit, boolean writeToWAL) throws Exception { + Delete delete, WALEdit edit, final Durability durability) throws Exception { // if we are making the update as part of a batch, we need to add in a batch marker so the WAL // is retained if (this.builder.getBatchId(delete) != null) { @@ -257,14 +258,14 @@ public class Indexer extends BaseRegionObserver { // get the mapping for index column -> target index table Collection> indexUpdates = this.builder.getIndexUpdate(delete); - if (doPre(indexUpdates, edit, writeToWAL)) { + if (doPre(indexUpdates, edit, durability)) { takeUpdateLock("delete"); } } @Override public void preBatchMutate(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws IOException { + MiniBatchOperationInProgress miniBatchOp) throws IOException { if (this.disabled) { super.preBatchMutate(c, miniBatchOp); return; @@ -281,12 +282,13 @@ public class Indexer extends BaseRegionObserver { @SuppressWarnings("deprecation") public void preBatchMutateWithExceptions(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws Throwable { + MiniBatchOperationInProgress miniBatchOp) throws Throwable { // first group all the updates for a single row into a single update to be processed Map mutations = new HashMap(); - boolean durable = false; + + Durability durability = Durability.SKIP_WAL; for (int i = 0; i < miniBatchOp.size(); i++) { // remove the batch keyvalue marker - its added for all puts WALEdit edit = miniBatchOp.getWalEdit(i); @@ -294,11 +296,13 @@ public class Indexer extends BaseRegionObserver { // we could check is indexing is enable for the mutation in prePut and then just skip this // after checking here, but this saves us the checking again. if (edit != null) { - KeyValue kv = edit.getKeyValues().remove(0); - assert kv == BATCH_MARKER : "Expected batch marker from the WALEdit, but got: " + kv; + KeyValue kv = edit.getKeyValues().get(0); + if (kv == BATCH_MARKER) { + // remove batch marker from the WALEdit + edit.getKeyValues().remove(0); + } } - Pair op = miniBatchOp.getOperation(i); - Mutation m = op.getFirst(); + Mutation m = miniBatchOp.getOperation(i); // skip this mutation if we aren't enabling indexing // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) // should be indexed, which means we need to expose another method on the builder. Such is the @@ -308,8 +312,8 @@ public class Indexer extends BaseRegionObserver { } // figure out if this is batch is durable or not - if(!durable){ - durable = m.getDurability() != Durability.SKIP_WAL; + if (m.getDurability().ordinal() > durability.ordinal()) { + durability = m.getDurability(); } // add the mutation to the batch set @@ -317,7 +321,7 @@ public class Indexer extends BaseRegionObserver { MultiMutation stored = mutations.get(row); // we haven't seen this row before, so add it if (stored == null) { - stored = new MultiMutation(row, m.getWriteToWAL()); + stored = new MultiMutation(row); mutations.put(row, stored); } stored.addAll(m); @@ -336,7 +340,7 @@ public class Indexer extends BaseRegionObserver { Collection> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations.values()); // write them - if (doPre(indexUpdates, edit, durable)) { + if (doPre(indexUpdates, edit, durability)) { takeUpdateLock("batch mutation"); } } @@ -372,9 +376,8 @@ public class Indexer extends BaseRegionObserver { private ImmutableBytesPtr rowKey; - public MultiMutation(ImmutableBytesPtr rowkey, boolean writeToWal) { + public MultiMutation(ImmutableBytesPtr rowkey) { this.rowKey = rowkey; - this.writeToWAL = writeToWal; } /** @@ -383,9 +386,9 @@ public class Indexer extends BaseRegionObserver { @SuppressWarnings("deprecation") public void addAll(Mutation stored) { // add all the kvs - for (Entry> kvs : stored.getFamilyMap().entrySet()) { + for (Entry> kvs : stored.getFamilyCellMap().entrySet()) { byte[] family = kvs.getKey(); - List list = getKeyValueList(family, kvs.getValue().size()); + List list = getKeyValueList(family, kvs.getValue().size()); list.addAll(kvs.getValue()); familyMap.put(family, list); } @@ -396,15 +399,12 @@ public class Indexer extends BaseRegionObserver { this.setAttribute(attrib.getKey(), attrib.getValue()); } } - if (stored.getWriteToWAL()) { - this.writeToWAL = true; - } } - private List getKeyValueList(byte[] family, int hint) { - List list = familyMap.get(family); + private List getKeyValueList(byte[] family, int hint) { + List list = familyMap.get(family); if (list == null) { - list = new ArrayList(hint); + list = new ArrayList(hint); } return list; } @@ -423,16 +423,6 @@ public class Indexer extends BaseRegionObserver { public boolean equals(Object o) { return o == null ? false : o.hashCode() == this.hashCode(); } - - @Override - public void readFields(DataInput arg0) throws IOException { - throw new UnsupportedOperationException("MultiMutations cannot be read/written"); - } - - @Override - public void write(DataOutput arg0) throws IOException { - throw new UnsupportedOperationException("MultiMutations cannot be read/written"); - } } /** @@ -441,7 +431,7 @@ public class Indexer extends BaseRegionObserver { * @throws IOException */ private boolean doPre(Collection> indexUpdates, final WALEdit edit, - final boolean writeToWAL) throws IOException { + final Durability durability) throws IOException { // no index updates, so we are done if (indexUpdates == null || indexUpdates.size() == 0) { return false; @@ -449,7 +439,7 @@ public class Indexer extends BaseRegionObserver { // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index // update right away - if (!writeToWAL) { + if (durability == Durability.SKIP_WAL) { try { this.writer.write(indexUpdates); return false; @@ -469,27 +459,27 @@ public class Indexer extends BaseRegionObserver { @Override public void postPut(ObserverContext e, Put put, WALEdit edit, - boolean writeToWAL) throws IOException { + final Durability durability) throws IOException { if (this.disabled) { - super.postPut(e, put, edit, writeToWAL); + super.postPut(e, put, edit, durability); return; } - doPost(edit, put, writeToWAL); + doPost(edit, put, durability); } @Override public void postDelete(ObserverContext e, Delete delete, - WALEdit edit, boolean writeToWAL) throws IOException { + WALEdit edit, final Durability durability) throws IOException { if (this.disabled) { - super.postDelete(e, delete, edit, writeToWAL); + super.postDelete(e, delete, edit, durability); return; } - doPost(edit,delete, writeToWAL); + doPost(edit, delete, durability); } @Override public void postBatchMutate(ObserverContext c, - MiniBatchOperationInProgress> miniBatchOp) throws IOException { + MiniBatchOperationInProgress miniBatchOp) throws IOException { if (this.disabled) { super.postBatchMutate(c, miniBatchOp); return; @@ -498,9 +488,9 @@ public class Indexer extends BaseRegionObserver { // noop for the rest of the indexer - its handled by the first call to put/delete } - private void doPost(WALEdit edit, Mutation m, boolean writeToWAL) throws IOException { + private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException { try { - doPostWithExceptions(edit, m, writeToWAL); + doPostWithExceptions(edit, m, durability); return; } catch (Throwable e) { rethrowIndexingException(e); @@ -509,9 +499,10 @@ public class Indexer extends BaseRegionObserver { "Somehow didn't complete the index update, but didn't return succesfully either!"); } - private void doPostWithExceptions(WALEdit edit, Mutation m, boolean writeToWAL) throws Exception { + private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability) + throws Exception { //short circuit, if we don't need to do any work - if (!writeToWAL || !this.builder.isEnabled(m)) { + if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) { // already did the index update in prePut, so we are done return; } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java index bbeae31..8c9a777 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/BaseIndexBuilder.java @@ -54,12 +54,12 @@ public abstract class BaseIndexBuilder implements IndexBuilder { } @Override - public void batchStarted(MiniBatchOperationInProgress> miniBatchOp) throws IOException { + public void batchStarted(MiniBatchOperationInProgress miniBatchOp) throws IOException { // noop } @Override - public void batchCompleted(MiniBatchOperationInProgress> miniBatchOp) { + public void batchCompleted(MiniBatchOperationInProgress miniBatchOp) { // noop } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java index 833f142..61fc90d 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuildManager.java @@ -116,7 +116,7 @@ public class IndexBuildManager implements Stoppable { public Collection> getIndexUpdate( - MiniBatchOperationInProgress> miniBatchOp, + MiniBatchOperationInProgress miniBatchOp, Collection mutations) throws Throwable { // notify the delegate that we have started processing a batch this.delegate.batchStarted(miniBatchOp); @@ -178,11 +178,11 @@ public class IndexBuildManager implements Stoppable { return delegate.getIndexUpdateForFilteredRows(filtered); } - public void batchCompleted(MiniBatchOperationInProgress> miniBatchOp) { + public void batchCompleted(MiniBatchOperationInProgress miniBatchOp) { delegate.batchCompleted(miniBatchOp); } - public void batchStarted(MiniBatchOperationInProgress> miniBatchOp) + public void batchStarted(MiniBatchOperationInProgress miniBatchOp) throws IOException { delegate.batchStarted(miniBatchOp); } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java index e23ea3f..e92edab 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/builder/IndexBuilder.java @@ -107,7 +107,7 @@ public interface IndexBuilder extends Stoppable { * Notification that a batch of updates has successfully been written. * @param miniBatchOp the full batch operation that was written */ - public void batchCompleted(MiniBatchOperationInProgress> miniBatchOp); + public void batchCompleted(MiniBatchOperationInProgress miniBatchOp); /** * Notification that a batch has been started. @@ -118,7 +118,7 @@ public interface IndexBuilder extends Stoppable { * @param miniBatchOp the full batch operation to be written * @throws IOException */ - public void batchStarted(MiniBatchOperationInProgress> miniBatchOp) throws IOException; + public void batchStarted(MiniBatchOperationInProgress miniBatchOp) throws IOException; /** * This allows the codec to dynamically change whether or not indexing should take place for a http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java index 422a9ec..ce5efc5 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/CoveredColumnsIndexBuilder.java @@ -33,23 +33,25 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; - -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; import org.apache.hadoop.hbase.index.builder.BaseIndexBuilder; import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState; import org.apache.hadoop.hbase.index.covered.data.LocalTable; import org.apache.hadoop.hbase.index.covered.update.ColumnTracker; import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager; import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; /** * Build covered indexes for phoenix updates. @@ -148,8 +150,9 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder { */ protected Collection createTimestampBatchesFromMutation(Mutation m) { Map batches = new HashMap(); - for (List family : m.getFamilyMap().values()) { - createTimestampBatchesFromKeyValues(family, batches); + for (List family : m.getFamilyCellMap().values()) { + List familyKVs = KeyValueUtil.ensureKeyValues(family); + createTimestampBatchesFromKeyValues(familyKVs, batches); } // sort the batches List sorted = new ArrayList(batches.values()); @@ -420,7 +423,7 @@ public class CoveredColumnsIndexBuilder extends BaseIndexBuilder { // We have to figure out which kind of delete it is, since we need to do different things if its // a general (row) delete, versus a delete of just a single column or family - Map> families = d.getFamilyMap(); + Map> families = d.getFamilyCellMap(); /* * Option 1: its a row delete marker, so we just need to delete the most recent state for each http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java index f6419f2..ce21135 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/LocalTableState.java @@ -28,13 +28,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.util.Pair; - import org.apache.hadoop.hbase.index.covered.data.IndexMemStore; import org.apache.hadoop.hbase.index.covered.data.LocalHBaseState; import org.apache.hadoop.hbase.index.covered.update.ColumnReference; @@ -42,6 +41,8 @@ import org.apache.hadoop.hbase.index.covered.update.ColumnTracker; import org.apache.hadoop.hbase.index.covered.update.IndexedColumnGroup; import org.apache.hadoop.hbase.index.scanner.Scanner; import org.apache.hadoop.hbase.index.scanner.ScannerBuilder; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.util.Pair; /** * Manage the state of the HRegion's view of the table, for the single row. @@ -175,7 +176,7 @@ public class LocalTableState implements TableState { public Result getCurrentRowState() { KeyValueScanner scanner = this.memstore.getScanner(); - List kvs = new ArrayList(); + List kvs = new ArrayList(); while (scanner.peek() != null) { try { kvs.add(scanner.next()); @@ -184,7 +185,7 @@ public class LocalTableState implements TableState { throw new RuntimeException("Local MemStore threw IOException!"); } } - return new Result(kvs); + return Result.create(kvs); } /** @@ -192,8 +193,8 @@ public class LocalTableState implements TableState { * @param pendingUpdate update to apply */ public void addUpdateForTesting(Mutation pendingUpdate) { - for (Map.Entry> e : pendingUpdate.getFamilyMap().entrySet()) { - List edits = e.getValue(); + for (Map.Entry> e : pendingUpdate.getFamilyCellMap().entrySet()) { + List edits = KeyValueUtil.ensureKeyValues(e.getValue()); addUpdate(edits); } } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java index e2cac10..5cee11a 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/IndexMemStore.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.index.covered.data; +import java.io.IOException; import java.util.Comparator; import java.util.Iterator; import java.util.SortedSet; @@ -26,7 +27,7 @@ import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.KeyComparator; +import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.IndexKeyValueSkipListSet; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -87,12 +88,12 @@ public class IndexMemStore implements KeyValueStore { */ public static final Comparator COMPARATOR = new Comparator() { - private final KeyComparator rawcomparator = new KeyComparator(); + private final KVComparator rawcomparator = new KVComparator(); @Override public int compare(final KeyValue left, final KeyValue right) { - return rawcomparator.compare(left.getBuffer(), left.getOffset() + KeyValue.ROW_OFFSET, - left.getKeyLength(), right.getBuffer(), right.getOffset() + KeyValue.ROW_OFFSET, + return rawcomparator.compareFlatKey(left.getRowArray(), left.getOffset() + KeyValue.ROW_OFFSET, + left.getKeyLength(), right.getRowArray(), right.getOffset() + KeyValue.ROW_OFFSET, right.getKeyLength()); } }; @@ -140,7 +141,8 @@ public class IndexMemStore implements KeyValueStore { } private String toString(KeyValue kv) { - return kv.toString() + "/value=" + Bytes.toString(kv.getValue()); + return kv.toString() + "/value=" + + Bytes.toString(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); } @Override @@ -159,7 +161,7 @@ public class IndexMemStore implements KeyValueStore { public KeyValueScanner getScanner() { return new MemStoreScanner(); } - + /* * MemStoreScanner implements the KeyValueScanner. It lets the caller scan the contents of a * memstore -- both current map and snapshot. This behaves as if it were a real scanner but does @@ -306,14 +308,13 @@ public class IndexMemStore implements KeyValueStore { public long getSequenceID() { return Long.MAX_VALUE; } - + @Override public boolean shouldUseScanner(Scan scan, SortedSet columns, long oldestUnexpiredTS) { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't support checking to see if it should use a scanner!"); } - /* @Override public boolean backwardSeek(KeyValue arg0) throws IOException { throw new UnsupportedOperationException(); @@ -328,6 +329,5 @@ public class IndexMemStore implements KeyValueStore { public boolean seekToPreviousRow(KeyValue arg0) throws IOException { throw new UnsupportedOperationException(); } - */ } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java index 52aa851..d2d99e6 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/data/LocalTable.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -63,11 +64,11 @@ public class LocalTable implements LocalHBaseState { s.setStopRow(row); HRegion region = this.env.getRegion(); RegionScanner scanner = region.getScanner(s); - List kvs = new ArrayList(1); + List kvs = new ArrayList(1); boolean more = scanner.next(kvs); assert !more : "Got more than one result when scanning" + " a single row in the primary table!"; - Result r = new Result(kvs); + Result r = Result.create(kvs); scanner.close(); return r; } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java index 8f0ee99..c588e95 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -244,7 +244,6 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { * @param pk primary key of the original row * @param length total number of bytes of all the values that should be added * @param values to use when building the key - * @return */ static byte[] composeRowKey(byte[] pk, int length, List values) { // now build up expected row key, each of the values, in order, followed by the PK and then some http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java index c7019c4..0ec3f96 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/example/CoveredColumnIndexer.java @@ -14,13 +14,12 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; - import org.apache.hadoop.hbase.index.covered.Batch; import org.apache.hadoop.hbase.index.covered.CoveredColumnsIndexBuilder; import org.apache.hadoop.hbase.index.covered.LocalTableState; import org.apache.hadoop.hbase.index.covered.update.IndexUpdateManager; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a @@ -108,10 +107,11 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder { Collection batches = batchByRow(filtered); for (Batch batch : batches) { - Put p = new Put(batch.getKvs().iterator().next().getRow()); + KeyValue curKV = batch.getKvs().iterator().next(); + Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength()); for (KeyValue kv : batch.getKvs()) { // we only need to cleanup Put entries - byte type = kv.getType(); + byte type = kv.getTypeByte(); Type t = KeyValue.Type.codeToType(type); if (!t.equals(Type.Put)) { continue; @@ -136,7 +136,6 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder { /** * @param filtered - * @return */ private Collection batchByRow(Collection filtered) { Map batches = new HashMap(); http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java index ebd2abe..658e981 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ApplyAndFilterDeletesFilter.java @@ -27,9 +27,12 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.filter.FilterBase; @@ -105,13 +108,14 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { } @Override - public ReturnCode filterKeyValue(KeyValue next) { + public ReturnCode filterKeyValue(Cell next) { // we marked ourselves done, but the END_ROW_KEY didn't manage to seek to the very last key if (this.done) { return ReturnCode.SKIP; } - switch (KeyValue.Type.codeToType(next.getType())) { + KeyValue nextKV = KeyValueUtil.ensureKeyValue(next); + switch (KeyValue.Type.codeToType(next.getTypeByte())) { /* * DeleteFamily will always sort first because those KVs (we assume) don't have qualifiers (or * rather are null). Therefore, we have to keep a hold of all the delete families until we get @@ -123,20 +127,20 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { // one. In fact, it means that all the previous deletes can be ignored because the family must // not match anymore. this.coveringDelete.reset(); - this.coveringDelete.deleteFamily = next; + this.coveringDelete.deleteFamily = nextKV; return ReturnCode.SKIP; case DeleteColumn: // similar to deleteFamily, all the newer deletes/puts would have been seen at this point, so // we can safely replace the more recent delete column with the more recent one this.coveringDelete.pointDelete = null; - this.coveringDelete.deleteColumn = next; + this.coveringDelete.deleteColumn = nextKV; return ReturnCode.SKIP; case Delete: // we are just deleting the single column value at this point. // therefore we just skip this entry and go onto the next one. The only caveat is that // we should still cover the next entry if this delete applies to the next entry, so we // have to keep around a reference to the KV to compare against the next valid entry - this.coveringDelete.pointDelete = next; + this.coveringDelete.pointDelete = nextKV; return ReturnCode.SKIP; default: // no covering deletes @@ -144,18 +148,18 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { return ReturnCode.INCLUDE; } - if (coveringDelete.matchesFamily(next)) { + if (coveringDelete.matchesFamily(nextKV)) { this.currentHint = familyHint; return ReturnCode.SEEK_NEXT_USING_HINT; } - if (coveringDelete.matchesColumn(next)) { + if (coveringDelete.matchesColumn(nextKV)) { // hint to the next column this.currentHint = columnHint; return ReturnCode.SEEK_NEXT_USING_HINT; } - if (coveringDelete.matchesPoint(next)) { + if (coveringDelete.matchesPoint(nextKV)) { return ReturnCode.SKIP; } @@ -165,16 +169,6 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { return ReturnCode.INCLUDE; } - @Override - public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!"); - } - - @Override - public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!"); - } - /** * Get the next hint for a given peeked keyvalue */ @@ -247,7 +241,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { if (deleteFamily == null) { return false; } - if (deleteFamily.matchingFamily(next)) { + if (CellUtil.matchingFamily(deleteFamily, next)) { // falls within the timestamp range if (deleteFamily.getTimestamp() >= next.getTimestamp()) { return true; @@ -269,7 +263,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { if (deleteColumn == null) { return false; } - if (deleteColumn.matchingFamily(next) && deleteColumn.matchingQualifier(next)) { + if (CellUtil.matchingFamily(deleteColumn, next) && deleteColumn.matchingQualifier(next)) { // falls within the timestamp range if (deleteColumn.getTimestamp() >= next.getTimestamp()) { return true; @@ -289,7 +283,7 @@ public class ApplyAndFilterDeletesFilter extends FilterBase { // that the timestamp matches exactly. Because we sort by timestamp first, either the next // keyvalue has the exact timestamp or is an older (smaller) timestamp, and we can allow that // one. - if (pointDelete != null && pointDelete.matchingFamily(next) + if (pointDelete != null && CellUtil.matchingFamily(pointDelete, next) && pointDelete.matchingQualifier(next)) { if (pointDelete.getTimestamp() == next.getTimestamp()) { return true; http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java index 72a10e1..494bf66 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/ColumnTrackingNextLargestTimestampFilter.java @@ -19,10 +19,7 @@ */ package org.apache.hadoop.hbase.index.covered.filter; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.FilterBase; @@ -53,7 +50,7 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase { } @Override - public ReturnCode filterKeyValue(KeyValue v) { + public ReturnCode filterKeyValue(Cell v) { long timestamp = v.getTimestamp(); if (timestamp > ts) { this.column.setTs(timestamp); @@ -62,13 +59,4 @@ public class ColumnTrackingNextLargestTimestampFilter extends FilterBase { return ReturnCode.INCLUDE; } - @Override - public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!"); - } - - @Override - public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!"); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java index 8591f88..7c35786 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/FamilyOnlyFilter.java @@ -17,10 +17,10 @@ */ package org.apache.hadoop.hbase.index.covered.filter; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; /** * Similar to the {@link FamilyFilter} but stops when the end of the family is reached and only @@ -39,7 +39,7 @@ public class FamilyOnlyFilter extends FamilyFilter { this(new BinaryComparator(family)); } - public FamilyOnlyFilter(final WritableByteArrayComparable familyComparator) { + public FamilyOnlyFilter(final ByteArrayComparable familyComparator) { super(CompareOp.EQUAL, familyComparator); } @@ -56,7 +56,7 @@ public class FamilyOnlyFilter extends FamilyFilter { } @Override - public ReturnCode filterKeyValue(KeyValue v) { + public ReturnCode filterKeyValue(Cell v) { if (done) { return ReturnCode.SKIP; } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java index 846ec88..92e9daf 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/MaxTimestampFilter.java @@ -23,6 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.util.Bytes; @@ -44,7 +45,13 @@ public class MaxTimestampFilter extends FilterBase { public KeyValue getNextKeyHint(KeyValue currentKV) { // this might be a little excessive right now - better safe than sorry though, so we don't mess // with other filters too much. - KeyValue kv = currentKV.deepCopy(); + KeyValue kv = null; + try { + kv = currentKV.clone(); + } catch (CloneNotSupportedException e) { + // the exception should not happen at all + throw new IllegalArgumentException(e); + } int offset =kv.getTimestampOffset(); //set the timestamp in the buffer byte[] buffer = kv.getBuffer(); @@ -55,22 +62,11 @@ public class MaxTimestampFilter extends FilterBase { } @Override - public ReturnCode filterKeyValue(KeyValue v) { + public ReturnCode filterKeyValue(Cell v) { long timestamp = v.getTimestamp(); if (timestamp > ts) { return ReturnCode.SEEK_NEXT_USING_HINT; } return ReturnCode.INCLUDE; } - - @Override - public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be serialized!"); - - } - - @Override - public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("Server-side only filter, cannot be deserialized!"); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java index 560cdd8..8e0f617 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/filter/NewerTimestampFilter.java @@ -4,6 +4,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.FilterBase; @@ -11,7 +12,6 @@ import org.apache.hadoop.hbase.filter.FilterBase; * Server-side only class used in the indexer to filter out keyvalues newer than a given timestamp * (so allows anything <= timestamp through). *

- * Note,this doesn't support {@link #write(DataOutput)} or {@link #readFields(DataInput)}. */ public class NewerTimestampFilter extends FilterBase { @@ -22,16 +22,8 @@ public class NewerTimestampFilter extends FilterBase { } @Override - public ReturnCode filterKeyValue(KeyValue ignored) { + public ReturnCode filterKeyValue(Cell ignored) { return ignored.getTimestamp() > timestamp ? ReturnCode.SKIP : ReturnCode.INCLUDE; } - @Override - public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("TimestampFilter is server-side only!"); - } - @Override - public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("TimestampFilter is server-side only!"); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java index 7ace79d..77b6e85 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/covered/update/IndexUpdateManager.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -224,12 +224,13 @@ public class IndexUpdateManager { + ((m instanceof Put) ? m.getTimeStamp() + " " : "")); sb.append(" row=" + Bytes.toString(m.getRow())); sb.append("\n"); - if (m.getFamilyMap().isEmpty()) { + if (m.getFamilyCellMap().isEmpty()) { sb.append("\t\t=== EMPTY ===\n"); } - for (List kvs : m.getFamilyMap().values()) { - for (KeyValue kv : kvs) { - sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValue())); + for (List kvs : m.getFamilyCellMap().values()) { + for (Cell kv : kvs) { + sb.append("\t\t" + kv.toString() + "/value=" + Bytes.toStringBinary(kv.getValueArray(), + kv.getValueOffset(), kv.getValueLength())); sb.append("\n"); } } http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java index c8afb04..2a343f0 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/parallel/ThreadPoolManager.java @@ -74,7 +74,6 @@ public class ThreadPoolManager { /** * @param conf - * @return */ private static ShutdownOnUnusedThreadPoolExecutor getDefaultExecutor(ThreadPoolBuilder builder) { int maxThreads = builder.getMaxThreads(); http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java index 0f7fed3..f3deb6a 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/scanner/FilteredKeyValueScanner.java @@ -125,7 +125,6 @@ public class FilteredKeyValueScanner implements KeyValueScanner { this.delegate.close(); } - /* @Override public boolean backwardSeek(KeyValue arg0) throws IOException { return this.delegate.backwardSeek(arg0); @@ -140,5 +139,4 @@ public class FilteredKeyValueScanner implements KeyValueScanner { public boolean seekToPreviousRow(KeyValue arg0) throws IOException { return this.delegate.seekToPreviousRow(arg0); } - */ } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java index 5ded879..2af2c7d 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/table/CoprocessorHTableFactory.java @@ -7,6 +7,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.Bytes; @@ -40,7 +41,7 @@ public class CoprocessorHTableFactory implements HTableFactory { if (LOG.isDebugEnabled()) { LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary())); } - return this.e.getTable(tablename.copyBytesIfNecessary()); + return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java index 9ee81a9..16b3584 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/util/IndexManagementUtil.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import com.google.common.collect.Maps; import org.apache.hadoop.hbase.index.ValueGetter; @@ -65,7 +66,8 @@ public class IndexManagementUtil { } catch (Throwable t) { return false; } - if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) { + if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf + .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, null))) { // its installed, and it can handle compression and non-compression cases return true; } @@ -91,7 +93,8 @@ public class IndexManagementUtil { if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) { if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException( "WAL Compression is only supported with " + codecClass - + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); } + + ". You can install in hbase-site.xml, under " + WALCellCodec.WAL_CELL_CODEC_CLASS_KEY); + } } else { throw new IllegalStateException(codecClass + " is not installed, but " + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY); http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java index a7f4e82..5b2c6b4 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/IndexedKeyValue.java @@ -8,8 +8,13 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr; @@ -46,12 +51,20 @@ public class IndexedKeyValue extends KeyValue { } /** - * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it + * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link WALEdit#METAFAMILY} so it * isn't replayed via the normal replay mechanism */ @Override public boolean matchingFamily(final byte[] family) { - return Bytes.equals(family, HLog.METAFAMILY); + return Bytes.equals(family, WALEdit.METAFAMILY); + } + + /** + * Not a real KeyValue + */ + @Override + public boolean matchingRow(final byte [] row) { + return false; } @Override @@ -77,22 +90,11 @@ public class IndexedKeyValue extends KeyValue { } private byte[] getMutationBytes() { - ByteArrayOutputStream bos = null; try { - bos = new ByteArrayOutputStream(); - this.mutation.write(new DataOutputStream(bos)); - bos.flush(); - return bos.toByteArray(); + MutationProto m = toMutationProto(this.mutation); + return m.toByteArray(); } catch (IOException e) { throw new IllegalArgumentException("Failed to get bytes for mutation!", e); - } finally { - if (bos != null) { - try { - bos.close(); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to get bytes for mutation!", e); - } - } } } @@ -101,11 +103,6 @@ public class IndexedKeyValue extends KeyValue { return hashCode; } - @Override - public void write(DataOutput out) throws IOException { - KeyValueCodec.write(out, this); - } - /** * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of @@ -118,8 +115,8 @@ public class IndexedKeyValue extends KeyValue { */ void writeData(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.indexTableName.get()); - out.writeUTF(this.mutation.getClass().getName()); - this.mutation.write(out); + MutationProto m = toMutationProto(this.mutation); + Bytes.writeByteArray(out, m.toByteArray()); } /** @@ -127,22 +124,12 @@ public class IndexedKeyValue extends KeyValue { * complement to {@link #writeData(DataOutput)}. */ @SuppressWarnings("javadoc") - @Override public void readFields(DataInput in) throws IOException { this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in)); - Class clazz; - try { - clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class); - this.mutation = clazz.newInstance(); - this.mutation.readFields(in); - this.hashCode = calcHashCode(indexTableName, mutation); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } catch (InstantiationException e) { - throw new IOException(e); - } catch (IllegalAccessException e) { - throw new IOException(e); - } + byte[] mutationData = Bytes.readByteArray(in); + MutationProto mProto = MutationProto.parseFrom(mutationData); + this.mutation = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(mProto); + this.hashCode = calcHashCode(indexTableName, mutation); } public boolean getBatchFinished() { @@ -152,4 +139,18 @@ public class IndexedKeyValue extends KeyValue { public void markBatchFinished() { this.batchFinished = true; } + + protected MutationProto toMutationProto(Mutation mutation) throws IOException { + MutationProto m = null; + if(mutation instanceof Put){ + m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.PUT, + mutation); + } else if(mutation instanceof Delete) { + m = org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(MutationType.DELETE, + mutation); + } else { + throw new IOException("Put/Delete mutations only supported"); + } + return m; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java index 0abdf8d..3340edc 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/wal/KeyValueCodec.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -49,16 +50,14 @@ public class KeyValueCodec { */ public static KeyValue readKeyValue(DataInput in) throws IOException { int length = in.readInt(); - KeyValue kv; // its a special IndexedKeyValue if (length == INDEX_TYPE_LENGTH_MARKER) { - kv = new IndexedKeyValue(); + IndexedKeyValue kv = new IndexedKeyValue(); kv.readFields(in); + return kv; } else { - kv = new KeyValue(); - kv.readFields(length, in); + return KeyValue.create(length, in); } - return kv; } /** @@ -73,7 +72,7 @@ public class KeyValueCodec { out.writeInt(INDEX_TYPE_LENGTH_MARKER); ((IndexedKeyValue) kv).writeData(out); } else { - kv.write(out); + KeyValue.write(kv, out); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java index bad82c4..dcef90a 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java @@ -8,6 +8,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.io.Writable; @@ -31,122 +32,12 @@ import org.apache.hadoop.io.Writable; * we need to track which of the regions were on the server when it crashed only only split those * edits out into their respective regions. */ -public class IndexedHLogReader implements Reader { +public class IndexedHLogReader extends ProtobufLogReader { private static final Log LOG = LogFactory.getLog(IndexedHLogReader.class); - private SequenceFileLogReader delegate; - - - private static class IndexedWALReader extends SequenceFileLogReader.WALReader { - - /** - * @param fs - * @param p - * @param c - * @throws IOException - */ - IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException { - super(fs, p, c); - } - - /** - * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without - * the check on the value class, since we have a special value class that doesn't directly match - * what was specified in the file header - */ - @Override - public synchronized boolean next(Writable key, Writable val) throws IOException { - boolean more = next(key); - - if (more) { - getCurrentValue(val); - } - - return more; - } - - } - - public IndexedHLogReader() { - this.delegate = new SequenceFileLogReader(); - } - - @Override - public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException { - this.delegate.init(fs, path, conf); - // close the old reader and replace with our own, custom one - this.delegate.reader.close(); - this.delegate.reader = new IndexedWALReader(fs, path, conf); - Exception e = new Exception(); - LOG.info("Instantiated indexed log reader." + Arrays.toString(e.getStackTrace())); - LOG.info("Got conf: " + conf); - } - - @Override - public void close() throws IOException { - this.delegate.close(); - } - - @Override - public Entry next() throws IOException { - return next(null); - } - - @Override - public Entry next(Entry reuse) throws IOException { - delegate.entryStart = delegate.reader.getPosition(); - HLog.Entry e = reuse; - if (e == null) { - HLogKey key; - if (delegate.keyClass == null) { - key = HLog.newKey(delegate.conf); - } else { - try { - key = delegate.keyClass.newInstance(); - } catch (InstantiationException ie) { - throw new IOException(ie); - } catch (IllegalAccessException iae) { - throw new IOException(iae); - } - } - WALEdit val = new WALEdit(); - e = new HLog.Entry(key, val); - } - - // now read in the HLog.Entry from the WAL - boolean nextPairValid = false; - try { - if (delegate.compressionContext != null) { - throw new UnsupportedOperationException( - "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits " - + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!"); - } - // this is the special bit - we use our custom entry to read in the key-values that have index - // information, but otherwise it looks just like a regular WALEdit - IndexedWALEdit edit = new IndexedWALEdit(e.getEdit()); - nextPairValid = delegate.reader.next(e.getKey(), edit); - } catch (IOException ioe) { - throw delegate.addFileInfoToException(ioe); - } - delegate.edit++; - if (delegate.compressionContext != null && delegate.emptyCompressionContext) { - delegate.emptyCompressionContext = false; - } - return nextPairValid ? e : null; - } - - @Override - public void seek(long pos) throws IOException { - this.delegate.seek(pos); - } - - @Override - public long getPosition() throws IOException { - return this.delegate.getPosition(); - } - @Override - public void reset() throws IOException { - this.delegate.reset(); + protected void initAfterCompression() throws IOException { + conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName()); + super.initAfterCompression(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/53f7d3ce/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java deleted file mode 100644 index 6749cc9..0000000 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.util.Bytes; - -import org.apache.hadoop.hbase.index.wal.KeyValueCodec; - -/** - * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader - *

- * This class should only be used with HBase < 0.94.9. Newer installations of HBase should - * instead use the IndexedWALEditCodec along with the correct configuration options. - */ -public class IndexedWALEdit extends WALEdit { - //reproduced here so we don't need to modify the HBase source. - private static final int VERSION_2 = -1; - private WALEdit delegate; - - /** - * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced. - * @param delegate to copy - */ - @SuppressWarnings("deprecation") - public IndexedWALEdit(WALEdit delegate) { - this.delegate = delegate; - // reset the delegate's fields - this.delegate.getKeyValues().clear(); - if (this.delegate.getScopes() != null) { - this.delegate.getScopes().clear(); - } - } - - public IndexedWALEdit() { - - } - - @Override -public void setCompressionContext(CompressionContext context) { - throw new UnsupportedOperationException( - "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead."); - } - - @SuppressWarnings("deprecation") - @Override - public void readFields(DataInput in) throws IOException { - delegate.getKeyValues().clear(); - if (delegate.getScopes() != null) { - delegate.getScopes().clear(); - } - // ---------------------------------------------------------------------------------------- - // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to - // capture the index updates - // ----------------------------------------------------------------------------------------- - int versionOrLength = in.readInt(); - if (versionOrLength != VERSION_2) { - throw new IOException("You must update your cluster to the lastest version of HBase and" - + " clean out all logs (cleanly start and then shutdown) before enabling indexing!"); - } - // this is new style HLog entry containing multiple KeyValues. - List kvs = KeyValueCodec.readKeyValues(in); - delegate.getKeyValues().addAll(kvs); - - // then read in the rest of the WALEdit - int numFamilies = in.readInt(); - NavigableMap scopes = delegate.getScopes(); - if (numFamilies > 0) { - if (scopes == null) { - scopes = new TreeMap(Bytes.BYTES_COMPARATOR); - } - for (int i = 0; i < numFamilies; i++) { - byte[] fam = Bytes.readByteArray(in); - int scope = in.readInt(); - scopes.put(fam, scope); - } - delegate.setScopes(scopes); - } - } - - @Override - public void write(DataOutput out) throws IOException { - throw new IOException( - "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead"); - } -} \ No newline at end of file