From commits-return-20522-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Sat Apr 14 09:37:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BC5991807A6 for ; Sat, 14 Apr 2018 09:37:19 +0200 (CEST) Received: (qmail 39741 invoked by uid 500); 14 Apr 2018 07:37:18 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 39370 invoked by uid 99); 14 Apr 2018 07:37:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Apr 2018 07:37:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC1B0F3246; Sat, 14 Apr 2018 07:37:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: pboado@apache.org To: commits@phoenix.apache.org Date: Sat, 14 Apr 2018 07:37:26 -0000 Message-Id: <242c5331a3ed467db91dcdbecb6aaef4@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/21] phoenix git commit: PHOENIX-4366 Rebuilding a local index fails sometimes PHOENIX-4366 Rebuilding a local index fails sometimes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a874df3a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a874df3a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a874df3a Branch: refs/heads/4.x-cdh5.12 Commit: a874df3a70bb52ea56f0159c37cdbb16534d762f Parents: 0987d09 Author: James Taylor Authored: Wed Apr 11 21:37:45 2018 +0100 Committer: Pedro Boado Committed: Fri Apr 13 23:27:33 2018 +0100 ---------------------------------------------------------------------- .../coprocessor/BaseScannerRegionObserver.java | 7 +---- .../GroupedAggregateRegionObserver.java | 4 +++ .../phoenix/coprocessor/ScanRegionObserver.java | 4 ++- .../UngroupedAggregateRegionObserver.java | 7 ++++- .../NonAggregateRegionScannerFactory.java | 29 +++++++++----------- .../phoenix/iterate/RegionScannerFactory.java | 4 +-- .../apache/phoenix/iterate/SnapshotScanner.java | 6 +--- 7 files changed, 30 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 7ef64b0..3310131 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -139,8 +139,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; - protected QualifierEncodingScheme encodingScheme; - protected boolean useNewValueColumnQualifier; @Override public void start(CoprocessorEnvironment e) throws IOException { @@ -211,8 +209,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); } - this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - this.useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); return s; } @@ -350,8 +346,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { - RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment(), - useNewValueColumnQualifier, encodingScheme); + RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment()); return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 67cc114..201bcec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -62,6 +62,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -110,6 +111,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { keyOrdered = true; } int offset = 0; + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); if (ScanUtil.isLocalIndex(scan)) { /* * For local indexes, we need to set an offset on row key expressions to skip @@ -395,6 +397,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); + final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache( @@ -466,6 +469,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } final Pair minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); + final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesPtr currentKey = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index b006ef6..2d9cd4f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EncodedColumnsUtil; /** * @@ -68,7 +70,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { @Override protected RegionScanner doPostScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws Throwable { - NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment(), useNewValueColumnQualifier, encodingScheme); + NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment()); return nonAggregateROUtil.getRegionScanner(scan, s); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 27d3880..de57772 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -376,7 +376,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver env, region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); return collectStats(s, statsCollector, region, scan, env.getConfiguration()); - } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); } + } else if (ScanUtil.isIndexRebuild(scan)) { + return rebuildIndices(s, region, scan, env.getConfiguration()); + } + + PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); int offsetToBe = 0; if (localIndexScan) { /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 91bab6f..90ea025 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -73,20 +73,13 @@ import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFrom public class NonAggregateRegionScannerFactory extends RegionScannerFactory { - private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - private KeyValueSchema kvSchema = null; - private ValueBitSet kvSchemaBitSet; - - public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env, boolean useNewValueColumnQualifier, - PTable.QualifierEncodingScheme encodingScheme) { + public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) { this.env = env; - this.useNewValueColumnQualifier = useNewValueColumnQualifier; - this.encodingScheme = encodingScheme; } @Override public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable { - + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); int offset = 0; if (ScanUtil.isLocalIndex(scan)) { /* @@ -105,9 +98,17 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes); } RegionScanner innerScanner = s; + PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); Set arrayKVRefs = Sets.newHashSet(); - Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs); + Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs); + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + for (Expression expression : arrayFuncRefs) { + builder.addField(expression); + } + KeyValueSchema kvSchema = builder.build(); + ValueBitSet kvSchemaBitSet = ValueBitSet.newInstance(kvSchema); TupleProjector tupleProjector = null; Region dataRegion = null; IndexMaintainer indexMaintainer = null; @@ -195,13 +196,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } - private Expression[] deserializeArrayPostionalExpressionInfoFromScan(Scan scan, RegionScanner s, - Set arrayKVRefs) { + private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s, + Set arrayKVRefs) { byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX); if (specificArrayIdx == null) { return null; } - KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx); try { DataInputStream input = new DataInputStream(stream); @@ -219,10 +219,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction(); arrayIdxFunc.readFields(input); arrayFuncRefs[i] = arrayIdxFunc; - builder.addField(arrayIdxFunc); } - kvSchema = builder.build(); - kvSchemaBitSet = ValueBitSet.newInstance(kvSchema); return arrayFuncRefs; } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 3dcbef9..aed5805 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -41,6 +41,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.*; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -54,8 +55,6 @@ import java.util.Set; public abstract class RegionScannerFactory { protected RegionCoprocessorEnvironment env; - protected boolean useNewValueColumnQualifier; - protected PTable.QualifierEncodingScheme encodingScheme; /** * Returns the region based on the value of the @@ -107,6 +106,7 @@ public abstract class RegionScannerFactory { private boolean hasReferences = checkForReferenceFiles(); private HRegionInfo regionInfo = env.getRegionInfo(); private byte[] actualStartKey = getActualStartKey(); + private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); // If there are any reference files after local index region merge some cases we might // get the records less than scan start row key. This will happen when we replace the http://git-wip-us.apache.org/repos/asf/phoenix/blob/a874df3a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java index 68592ef..9e2a08b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java @@ -54,15 +54,11 @@ public class SnapshotScanner extends AbstractClientScanner { values = new ArrayList<>(); this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); - // process the region scanner for non-aggregate queries - PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); - RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf); RegionScannerFactory regionScannerFactory; if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) { - regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv, useNewValueColumnQualifier, encodingScheme); + regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv); } else { /* future work : Snapshot M/R jobs for aggregate queries*/ throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");