Return-Path: X-Original-To: apmail-kylin-commits-archive@minotaur.apache.org Delivered-To: apmail-kylin-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6525818883 for ; Mon, 24 Aug 2015 09:00:55 +0000 (UTC) Received: (qmail 53426 invoked by uid 500); 24 Aug 2015 09:00:55 -0000 Delivered-To: apmail-kylin-commits-archive@kylin.apache.org Received: (qmail 53360 invoked by uid 500); 24 Aug 2015 09:00:55 -0000 Mailing-List: contact commits-help@kylin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kylin.incubator.apache.org Delivered-To: mailing list commits@kylin.incubator.apache.org Received: (qmail 53346 invoked by uid 99); 24 Aug 2015 09:00:55 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Aug 2015 09:00:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id AF144ED4A0 for ; Mon, 24 Aug 2015 09:00:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.373 X-Spam-Level: X-Spam-Status: No, score=0.373 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-1.427] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1QEwtR0PPNZr for ; Mon, 24 Aug 2015 09:00:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id DED1A5069C for ; Mon, 24 Aug 2015 09:00:40 +0000 (UTC) Received: (qmail 52684 invoked by uid 99); 24 Aug 2015 09:00:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Aug 2015 09:00:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F57CE0574; Mon, 24 Aug 2015 09:00:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mahongbin@apache.org To: commits@kylin.incubator.apache.org Date: Mon, 24 Aug 2015 09:00:47 -0000 Message-Id: In-Reply-To: <19989ece16cd407baa780ba789b6263f@git.apache.org> References: <19989ece16cd407baa780ba789b6263f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/22] incubator-kylin git commit: KYLIN-960 organize existing storage module http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java deleted file mode 100644 index 35e4839..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java +++ /dev/null @@ -1,679 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.util.ArrayList; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseMappingDesc; -import org.apache.kylin.dict.Dictionary; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.storage.ICachableStorageQuery; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.tuple.TupleInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; - -/** - * @author xjiang, yangli9 - */ -public class CubeStorageQuery implements ICachableStorageQuery { - - private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class); - - private static final int MERGE_KEYRANGE_THRESHOLD = 100; - private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G - - private final CubeInstance cubeInstance; - private final CubeDesc cubeDesc; - private final String uuid; - - public CubeStorageQuery(CubeInstance cube) { - this.cubeInstance = cube; - this.cubeDesc = cube.getDescriptor(); - this.uuid = cube.getUuid(); - } - - @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - - Collection groups = sqlDigest.groupbyColumns; - TupleFilter filter = sqlDigest.filter; - - // build dimension & metrics - Collection dimensions = new HashSet(); - Collection metrics = new HashSet(); - buildDimensionsAndMetrics(dimensions, metrics, sqlDigest); - - // all dimensions = groups + others - Set others = Sets.newHashSet(dimensions); - others.removeAll(groups); - - // expand derived - Set derivedPostAggregation = Sets.newHashSet(); - Set groupsD = expandDerived(groups, derivedPostAggregation); - Set othersD = expandDerived(others, derivedPostAggregation); - othersD.removeAll(groupsD); - derivedPostAggregation.removeAll(groups); - - // identify cuboid - Set dimensionsD = Sets.newHashSet(); - dimensionsD.addAll(groupsD); - dimensionsD.addAll(othersD); - Cuboid cuboid = identifyCuboid(dimensionsD); - context.setCuboid(cuboid); - - // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine - Set singleValuesD = findSingleValueColumns(filter); - boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation); - context.setExactAggregation(isExactAggregation); - - // translate filter for scan range and compose returning groups for coprocessor, note: - // - columns on non-evaluatable filter have to return - // - columns on loosened filter (due to derived translation) have to return - Set groupsCopD = Sets.newHashSet(groupsD); - collectNonEvaluable(filter, groupsCopD); - TupleFilter filterD = translateDerived(filter, groupsCopD); - - // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR .. - TupleFilter flatFilter = flattenToOrAndFilter(filterD); - - // translate filter into segment scan ranges - List scans = buildScanRanges(flatFilter, dimensionsD); - - // check involved measures, build value decoder for each each family:column - List valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context); - - //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more - //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory - setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial - setLimit(filter, context); - - HConnection conn = HBaseConnection.get(context.getConnUrl()); - return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); - } - - @Override - public Range getVolatilePeriod() { - return null; - } - - @Override - public String getStorageUUID() { - return this.uuid; - } - - @Override - public boolean isDynamic() { - return false; - } - - private void buildDimensionsAndMetrics(Collection dimensions, Collection metrics, SQLDigest sqlDigest) { - - for (FunctionDesc func : sqlDigest.aggregations) { - if (!func.isDimensionAsMetric()) { - metrics.add(func); - } - } - - for (TblColRef column : sqlDigest.allColumns) { - // skip measure columns - if (sqlDigest.metricColumns.contains(column)) { - continue; - } - dimensions.add(column); - } - } - - private Cuboid identifyCuboid(Set dimensions) { - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cubeDesc.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return Cuboid.findById(cubeDesc, cuboidID); - } - - private boolean isExactAggregation(Cuboid cuboid, Collection groups, Set othersD, Set singleValuesD, Set derivedPostAggregation) { - boolean exact = true; - - if (cuboid.requirePostAggregation()) { - exact = false; - logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); - } - - // derived aggregation is bad, unless expanded columns are already in group by - if (groups.containsAll(derivedPostAggregation) == false) { - exact = false; - logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); - } - - // other columns (from filter) is bad, unless they are ensured to have single value - if (singleValuesD.containsAll(othersD) == false) { - exact = false; - logger.info("exactAggregation is false because some column not on group by: " + othersD // - + " (single value column: " + singleValuesD + ")"); - } - - if (exact) { - logger.info("exactAggregation is true"); - } - return exact; - } - - private Set expandDerived(Collection cols, Set derivedPostAggregation) { - Set expanded = Sets.newHashSet(); - for (TblColRef col : cols) { - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef hostCol : hostInfo.columns) { - expanded.add(hostCol); - if (hostInfo.isOneToOne == false) - derivedPostAggregation.add(hostCol); - } - } else { - expanded.add(col); - } - } - return expanded; - } - - @SuppressWarnings("unchecked") - private Set findSingleValueColumns(TupleFilter filter) { - Collection toCheck; - if (filter instanceof CompareTupleFilter) { - toCheck = Collections.singleton(filter); - } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) { - toCheck = filter.getChildren(); - } else { - return (Set) Collections.EMPTY_SET; - } - - Set result = Sets.newHashSet(); - for (TupleFilter f : toCheck) { - if (f instanceof CompareTupleFilter) { - CompareTupleFilter compFilter = (CompareTupleFilter) f; - // is COL=const ? - if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) { - result.add(compFilter.getColumn()); - } - } - } - - // expand derived - Set resultD = Sets.newHashSet(); - for (TblColRef col : result) { - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - if (hostInfo.isOneToOne) { - for (TblColRef hostCol : hostInfo.columns) { - resultD.add(hostCol); - } - } - //if not one2one, it will be pruned - } else { - resultD.add(col); - } - } - return resultD; - } - - private void collectNonEvaluable(TupleFilter filter, Set collector) { - if (filter == null) - return; - - if (filter.isEvaluable()) { - for (TupleFilter child : filter.getChildren()) - collectNonEvaluable(child, collector); - } else { - collectColumnsRecursively(filter, collector); - } - } - - private void collectColumnsRecursively(TupleFilter filter, Set collector) { - if (filter instanceof ColumnTupleFilter) { - collectColumns(((ColumnTupleFilter) filter).getColumn(), collector); - } - for (TupleFilter child : filter.getChildren()) { - collectColumnsRecursively(child, collector); - } - } - - private void collectColumns(TblColRef col, Set collector) { - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef h : hostInfo.columns) - collector.add(h); - } else { - collector.add(col); - } - } - - @SuppressWarnings("unchecked") - private TupleFilter translateDerived(TupleFilter filter, Set collector) { - if (filter == null) - return filter; - - if (filter instanceof CompareTupleFilter) { - return translateDerivedInCompare((CompareTupleFilter) filter, collector); - } - - List children = (List) filter.getChildren(); - List newChildren = Lists.newArrayListWithCapacity(children.size()); - boolean modified = false; - for (TupleFilter child : children) { - TupleFilter translated = translateDerived(child, collector); - newChildren.add(translated); - if (child != translated) - modified = true; - } - if (modified) { - filter = replaceChildren(filter, newChildren); - } - return filter; - } - - private TupleFilter replaceChildren(TupleFilter filter, List newChildren) { - if (filter instanceof LogicalTupleFilter) { - LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator()); - r.addChildren(newChildren); - return r; - } else - throw new IllegalStateException("Cannot replaceChildren on " + filter); - } - - private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set collector) { - if (compf.getColumn() == null || compf.getValues().isEmpty()) - return compf; - - TblColRef derived = compf.getColumn(); - if (cubeDesc.isDerived(derived) == false) - return compf; - - DeriveInfo hostInfo = cubeDesc.getHostInfo(derived); - CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig()); - CubeSegment seg = cubeInstance.getLatestReadySegment(); - LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension); - Pair translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf); - TupleFilter translatedFilter = translated.getFirst(); - boolean loosened = translated.getSecond(); - if (loosened) { - collectColumnsRecursively(translatedFilter, collector); - } - return translatedFilter; - } - - private List translateAggregation(HBaseMappingDesc hbaseMapping, Collection metrics, // - StorageContext context) { - Map codecMap = Maps.newHashMap(); - for (FunctionDesc aggrFunc : metrics) { - Collection hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc); - if (hbCols.isEmpty()) { - throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression()); - } - HBaseColumnDesc bestHBCol = null; - int bestIndex = -1; - for (HBaseColumnDesc hbCol : hbCols) { - bestHBCol = hbCol; - bestIndex = hbCol.findMeasureIndex(aggrFunc); - MeasureDesc measure = hbCol.getMeasures()[bestIndex]; - // criteria for holistic measure: Exact Aggregation && Exact Cuboid - if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) { - logger.info("Holistic count distinct chosen for " + aggrFunc); - break; - } - } - - RowValueDecoder codec = codecMap.get(bestHBCol); - if (codec == null) { - codec = new RowValueDecoder(bestHBCol); - codecMap.put(bestHBCol, codec); - } - codec.setIndex(bestIndex); - } - return new ArrayList(codecMap.values()); - } - - private TupleFilter flattenToOrAndFilter(TupleFilter filter) { - if (filter == null) - return null; - - TupleFilter flatFilter = filter.flatFilter(); - - // normalize to OR-AND filter - if (flatFilter.getOperator() == FilterOperatorEnum.AND) { - LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR); - f.addChild(flatFilter); - flatFilter = f; - } - - if (flatFilter.getOperator() != FilterOperatorEnum.OR) - throw new IllegalStateException(); - - return flatFilter; - } - - private List buildScanRanges(TupleFilter flatFilter, Collection dimensionColumns) { - - List result = Lists.newArrayList(); - - logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all"); - List segs = cubeInstance.getSegments(SegmentStatusEnum.READY); - logger.info("READY segs count: " + segs.size()); - - // build row key range for each cube segment - StringBuilder sb = new StringBuilder("hbasekeyrange trace: "); - for (CubeSegment cubeSeg : segs) { - - // consider derived (lookup snapshot), filter on dimension may - // differ per segment - List> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg); - if (orAndDimRanges == null) { // has conflict - continue; - } - - List scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); - for (Collection andDimRanges : orAndDimRanges) { - HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc); - scanRanges.add(rowKeyRange); - } - - sb.append(scanRanges.size() + "=>"); - List mergedRanges = mergeOverlapRanges(scanRanges); - sb.append(mergedRanges.size() + "=>"); - mergedRanges = mergeTooManyRanges(mergedRanges); - sb.append(mergedRanges.size() + ", "); - result.addAll(mergedRanges); - } - - logger.info(sb.toString()); - - logger.info("hbasekeyrange count: " + result.size()); - dropUnhitSegments(result); - logger.info("hbasekeyrange count after dropping unhit :" + result.size()); - - return result; - } - - private List> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) { - List> result = Lists.newArrayList(); - - if (flatFilter == null) { - result.add(Collections. emptyList()); - return result; - } - - for (TupleFilter andFilter : flatFilter.getChildren()) { - if (andFilter.getOperator() != FilterOperatorEnum.AND) { - throw new IllegalStateException("Filter should be AND instead of " + andFilter); - } - - Collection andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment); - - result.add(andRanges); - } - - return preprocessConstantConditions(result); - } - - private List> preprocessConstantConditions(List> orAndRanges) { - boolean globalAlwaysTrue = false; - Iterator> iterator = orAndRanges.iterator(); - while (iterator.hasNext()) { - Collection andRanges = iterator.next(); - Iterator iterator2 = andRanges.iterator(); - boolean hasAlwaysFalse = false; - while (iterator2.hasNext()) { - ColumnValueRange range = iterator2.next(); - if (range.satisfyAll()) - iterator2.remove(); - else if (range.satisfyNone()) - hasAlwaysFalse = true; - } - if (hasAlwaysFalse) { - iterator.remove(); - } else if (andRanges.isEmpty()) { - globalAlwaysTrue = true; - break; - } - } - if (globalAlwaysTrue) { - orAndRanges.clear(); - orAndRanges.add(Collections. emptyList()); - } - return orAndRanges; - } - - // return empty collection to mean true; return null to mean false - @SuppressWarnings("unchecked") - private Collection translateToAndDimRanges(List andFilters, CubeSegment cubeSegment) { - Map rangeMap = new HashMap(); - for (TupleFilter filter : andFilters) { - if ((filter instanceof CompareTupleFilter) == false) { - continue; - } - - CompareTupleFilter comp = (CompareTupleFilter) filter; - if (comp.getColumn() == null) { - continue; - } - - ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection) comp.getValues(), comp.getOperator()); - andMerge(range, rangeMap); - } - - // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary - Iterator it = rangeMap.values().iterator(); - while (it.hasNext()) { - ColumnValueRange range = it.next(); - range.preEvaluateWithDict((Dictionary) cubeSegment.getDictionary(range.getColumn())); - if (range.satisfyAll()) - it.remove(); - else if (range.satisfyNone()) - return null; - } - - return rangeMap.values(); - } - - private void andMerge(ColumnValueRange range, Map rangeMap) { - ColumnValueRange columnRange = rangeMap.get(range.getColumn()); - if (columnRange == null) { - rangeMap.put(range.getColumn(), range); - } else { - columnRange.andMerge(range); - } - } - - private List mergeOverlapRanges(List keyRanges) { - if (keyRanges.size() <= 1) { - return keyRanges; - } - - if (logger.isDebugEnabled()) { - logger.debug("Merging key range from " + keyRanges.size()); - } - - // sort ranges by start key - Collections.sort(keyRanges); - - // merge the overlap range - List mergedRanges = new LinkedList(); - int beginIndex = 0; - byte[] maxStopKey = keyRanges.get(0).getStopKey(); - for (int index = 0; index < keyRanges.size(); index++) { - HBaseKeyRange keyRange = keyRanges.get(index); - if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) { - // merge the current key ranges - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1); - mergedRanges.add(mergedRange); - // start new merge - beginIndex = index; - } - if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) { - // update the stop key - maxStopKey = keyRange.getStopKey(); - } - } - // merge last range - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1); - mergedRanges.add(mergedRange); - if (logger.isDebugEnabled()) { - logger.debug("Merging key range to " + mergedRanges.size()); - } - return mergedRanges; - } - - private HBaseKeyRange mergeKeyRange(List keyRanges, int from, int to) { - HBaseKeyRange keyRange = keyRanges.get(from); - int mergeSize = to - from + 1; - if (mergeSize > 1) { - // merge range from mergeHeader to i - 1 - CubeSegment cubeSegment = keyRange.getCubeSegment(); - Cuboid cuboid = keyRange.getCuboid(); - byte[] startKey = keyRange.getStartKey(); - byte[] stopKey = keyRange.getStopKey(); - long partitionColumnStartDate = Long.MAX_VALUE; - long partitionColumnEndDate = 0; - List> newFuzzyKeys = new ArrayList>(mergeSize); - List> newFlatOrAndFilter = Lists.newLinkedList(); - - boolean hasNonFuzzyRange = false; - for (int k = from; k <= to; k++) { - HBaseKeyRange nextRange = keyRanges.get(k); - hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty(); - newFuzzyKeys.addAll(nextRange.getFuzzyKeys()); - newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter()); - if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) { - stopKey = nextRange.getStopKey(); - } - if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) { - partitionColumnStartDate = nextRange.getPartitionColumnStartDate(); - } - if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) { - partitionColumnEndDate = nextRange.getPartitionColumnEndDate(); - } - } - - // if any range is non-fuzzy, then all fuzzy keys must be cleared - if (hasNonFuzzyRange) { - newFuzzyKeys.clear(); - } - - partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate; - partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate; - keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate); - } - return keyRange; - } - - private List mergeTooManyRanges(List keyRanges) { - if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) { - return keyRanges; - } - // TODO: check the distance between range. and merge the large distance range - List mergedRanges = new LinkedList(); - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1); - mergedRanges.add(mergedRange); - return mergedRanges; - } - - private void dropUnhitSegments(List scans) { - if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) { - Iterator iterator = scans.iterator(); - while (iterator.hasNext()) { - HBaseKeyRange scan = iterator.next(); - if (scan.hitSegment() == false) { - iterator.remove(); - } - } - } - } - - private void setThreshold(Collection dimensions, List valueDecoders, StorageContext context) { - if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) { - return; - } - - int rowSizeEst = dimensions.size() * 3; - for (RowValueDecoder decoder : valueDecoders) { - MeasureDesc[] measures = decoder.getMeasures(); - BitSet projectionIndex = decoder.getProjectionIndex(); - for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { - FunctionDesc func = measures[i].getFunction(); - rowSizeEst += func.getReturnDataType().getSpaceEstimate(); - } - } - - long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst; - context.setThreshold((int) rowEst); - } - - private void setLimit(TupleFilter filter, StorageContext context) { - boolean goodAggr = context.isExactAggregation(); - boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled()); - boolean goodSort = context.hasSort() == false; - if (goodAggr && goodFilter && goodSort) { - logger.info("Enable limit " + context.getLimit()); - context.enableLimit(); - } - } - - private void setCoprocessor(Set groupsCopD, List valueDecoders, StorageContext context) { - ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java deleted file mode 100644 index 64d1951..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java +++ /dev/null @@ -1,208 +0,0 @@ -package org.apache.kylin.storage.hbase; - -import java.io.IOException; -import java.util.BitSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowKeyDecoder; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.tuple.Tuple; -import org.apache.kylin.storage.tuple.TupleInfo; - -import com.google.common.collect.Lists; - -public class CubeTupleConverter { - - final CubeSegment cubeSeg; - final Cuboid cuboid; - final TupleInfo tupleInfo; - final RowKeyDecoder rowKeyDecoder; - final List rowValueDecoders; - final List derivedColFillers; - - final int[] dimensionTupleIdx; - final int[][] metricsMeasureIdx; - final int[][] metricsTupleIdx; - - public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List rowValueDecoders, TupleInfo tupleInfo) { - this.cubeSeg = cubeSeg; - this.cuboid = cuboid; - this.tupleInfo = tupleInfo; - this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); - this.rowValueDecoders = rowValueDecoders; - this.derivedColFillers = Lists.newArrayList(); - - List dimCols = cuboid.getColumns(); - - // pre-calculate dimension index mapping to tuple - dimensionTupleIdx = new int[dimCols.size()]; - for (int i = 0; i < dimCols.size(); i++) { - TblColRef col = dimCols.get(i); - dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - - // pre-calculate metrics index mapping to tuple - metricsMeasureIdx = new int[rowValueDecoders.size()][]; - metricsTupleIdx = new int[rowValueDecoders.size()][]; - for (int i = 0; i < rowValueDecoders.size(); i++) { - RowValueDecoder decoder = rowValueDecoders.get(i); - MeasureDesc[] measures = decoder.getMeasures(); - BitSet selectedMeasures = decoder.getProjectionIndex(); - metricsMeasureIdx[i] = new int[selectedMeasures.cardinality()]; - metricsTupleIdx[i] = new int[selectedMeasures.cardinality()]; - for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) { - FunctionDesc aggrFunc = measures[mi].getFunction(); - int tupleIdx; - // a rewrite metrics is identified by its rewrite field name - if (aggrFunc.needRewrite()) { - String rewriteFieldName = aggrFunc.getRewriteFieldName(); - tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; - } - // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column - else { - TblColRef col = aggrFunc.getParameter().getColRefs().get(0); - tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - metricsMeasureIdx[i][j] = mi; - metricsTupleIdx[i][j] = tupleIdx; - } - } - - // prepare derived columns and filler - Map, List> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null); - for (Entry, List> entry : hostToDerivedInfo.entrySet()) { - TblColRef[] hostCols = entry.getKey().data; - for (DeriveInfo deriveInfo : entry.getValue()) { - IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo); - if (filler != null) { - derivedColFillers.add(filler); - } - } - } - } - - public void translateResult(Result hbaseRow, Tuple tuple) { - try { - byte[] rowkey = hbaseRow.getRow(); - rowKeyDecoder.decode(rowkey); - } catch (IOException ex) { - throw new RuntimeException("Cannot translate hbase result " + hbaseRow); - } - - // dimensions - List dimensionValues = rowKeyDecoder.getValues(); - for (int i = 0; i < dimensionValues.size(); i++) { - int tupleIdx = dimensionTupleIdx[i]; - if (tupleIdx >= 0) { - tuple.setDimensionValue(tupleIdx, dimensionValues.get(i)); - } - } - - // derived - for (IDerivedColumnFiller filler : derivedColFillers) { - filler.fillDerivedColumns(dimensionValues, tuple); - } - - // measures - for (int i = 0; i < rowValueDecoders.size(); i++) { - RowValueDecoder rowValueDecoder = rowValueDecoders.get(i); - rowValueDecoder.decode(hbaseRow); - Object[] measureValues = rowValueDecoder.getValues(); - - int[] measureIdx = metricsMeasureIdx[i]; - int[] tupleIdx = metricsTupleIdx[i]; - for (int j = 0; j < measureIdx.length; j++) { - tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]); - } - } - } - - private interface IDerivedColumnFiller { - public void fillDerivedColumns(List rowValues, Tuple tuple); - } - - private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) { - List rowColumns = cuboid.getColumns(); - - final int[] hostColIdx = new int[hostCols.length]; - for (int i = 0; i < hostCols.length; i++) { - hostColIdx[i] = rowColumns.indexOf(hostCols[i]); - } - - boolean needCopyDerived = false; - final int[] derivedTupleIdx = new int[deriveInfo.columns.length]; - for (int i = 0; i < deriveInfo.columns.length; i++) { - TblColRef col = deriveInfo.columns[i]; - derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0; - } - - if (needCopyDerived == false) - return null; - - switch (deriveInfo.type) { - case LOOKUP: - return new IDerivedColumnFiller() { - CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig()); - LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension); - int[] derivedColIdx = initDerivedColIdx(); - Array lookupKey = new Array(new String[hostColIdx.length]); - - private int[] initDerivedColIdx() { - int[] idx = new int[deriveInfo.columns.length]; - for (int i = 0; i < idx.length; i++) { - idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex(); - } - return idx; - } - - @Override - public void fillDerivedColumns(List rowValues, Tuple tuple) { - for (int i = 0; i < hostColIdx.length; i++) { - lookupKey.data[i] = rowValues.get(hostColIdx[i]); - } - - String[] lookupRow = lookupTable.getRow(lookupKey); - - if (lookupRow != null) { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - String value = lookupRow[derivedColIdx[i]]; - tuple.setDimensionValue(derivedTupleIdx[i], value); - } - } - } else { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - tuple.setDimensionValue(derivedTupleIdx[i], null); - } - } - } - } - }; - case PK_FK: - return new IDerivedColumnFiller() { - @Override - public void fillDerivedColumns(List rowValues, Tuple tuple) { - // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns() - tuple.setDimensionValue(derivedTupleIdx[0], rowValues.get(hostColIdx[0])); - } - }; - default: - throw new IllegalArgumentException(); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/DerivedFilterTranslator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/DerivedFilterTranslator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/DerivedFilterTranslator.java deleted file mode 100644 index 98103bd..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/DerivedFilterTranslator.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.util.List; -import java.util.Set; - -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.kv.RowKeyColumnOrder; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.cube.model.CubeDesc.DeriveType; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.ConstantTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.StringCodeSystem; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.IEvaluatableTuple; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * @author yangli9 - * - */ -public class DerivedFilterTranslator { - - private static final int IN_THRESHOLD = 5; - - public static Pair translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) { - - TblColRef derivedCol = compf.getColumn(); - TblColRef[] hostCols = hostInfo.columns; - TblColRef[] pkCols = hostInfo.dimension.getJoin().getPrimaryKeyColumns(); - - if (hostInfo.type == DeriveType.PK_FK) { - assert hostCols.length == 1; - CompareTupleFilter newComp = new CompareTupleFilter(compf.getOperator()); - newComp.addChild(new ColumnTupleFilter(hostCols[0])); - newComp.addChild(new ConstantTupleFilter(compf.getValues())); - return new Pair(newComp, false); - } - - assert hostInfo.type == DeriveType.LOOKUP; - assert hostCols.length == pkCols.length; - - int di = derivedCol.getColumnDesc().getZeroBasedIndex(); - int[] pi = new int[pkCols.length]; - int hn = hostCols.length; - for (int i = 0; i < hn; i++) { - pi[i] = pkCols[i].getColumnDesc().getZeroBasedIndex(); - } - - Set> satisfyingHostRecords = Sets.newHashSet(); - SingleColumnTuple tuple = new SingleColumnTuple(derivedCol); - for (String[] row : lookup.getAllRows()) { - tuple.value = row[di]; - if (compf.evaluate(tuple, StringCodeSystem.INSTANCE)) { - collect(row, pi, satisfyingHostRecords); - } - } - - TupleFilter translated; - boolean loosened; - if (satisfyingHostRecords.size() > IN_THRESHOLD) { - translated = buildRangeFilter(hostCols, satisfyingHostRecords); - loosened = true; - } else { - translated = buildInFilter(hostCols, satisfyingHostRecords); - loosened = false; - } - - return new Pair(translated, loosened); - } - - private static void collect(String[] row, int[] pi, Set> satisfyingHostRecords) { - // TODO when go beyond IN_THRESHOLD, only keep min/max is enough - String[] rec = new String[pi.length]; - for (int i = 0; i < pi.length; i++) { - rec[i] = row[pi[i]]; - } - satisfyingHostRecords.add(new Array(rec)); - } - - private static TupleFilter buildInFilter(TblColRef[] hostCols, Set> satisfyingHostRecords) { - if (satisfyingHostRecords.size() == 0) { - return ConstantTupleFilter.FALSE; - } - - int hn = hostCols.length; - if (hn == 1) { - CompareTupleFilter in = new CompareTupleFilter(FilterOperatorEnum.IN); - in.addChild(new ColumnTupleFilter(hostCols[0])); - in.addChild(new ConstantTupleFilter(asValues(satisfyingHostRecords))); - return in; - } else { - LogicalTupleFilter or = new LogicalTupleFilter(FilterOperatorEnum.OR); - for (Array rec : satisfyingHostRecords) { - LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); - for (int i = 0; i < hn; i++) { - CompareTupleFilter eq = new CompareTupleFilter(FilterOperatorEnum.EQ); - eq.addChild(new ColumnTupleFilter(hostCols[i])); - eq.addChild(new ConstantTupleFilter(rec.data[i])); - and.addChild(eq); - } - or.addChild(and); - } - return or; - } - } - - private static List asValues(Set> satisfyingHostRecords) { - List values = Lists.newArrayListWithCapacity(satisfyingHostRecords.size()); - for (Array rec : satisfyingHostRecords) { - values.add(rec.data[0]); - } - return values; - } - - private static LogicalTupleFilter buildRangeFilter(TblColRef[] hostCols, Set> satisfyingHostRecords) { - int hn = hostCols.length; - String[] min = new String[hn]; - String[] max = new String[hn]; - findMinMax(satisfyingHostRecords, hostCols, min, max); - LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND); - for (int i = 0; i < hn; i++) { - CompareTupleFilter compMin = new CompareTupleFilter(FilterOperatorEnum.GTE); - compMin.addChild(new ColumnTupleFilter(hostCols[i])); - compMin.addChild(new ConstantTupleFilter(min[i])); - and.addChild(compMin); - CompareTupleFilter compMax = new CompareTupleFilter(FilterOperatorEnum.LTE); - compMax.addChild(new ColumnTupleFilter(hostCols[i])); - compMax.addChild(new ConstantTupleFilter(max[i])); - and.addChild(compMax); - } - return and; - } - - private static void findMinMax(Set> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) { - - RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length]; - for (int i = 0; i < hostCols.length; i++) { - orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType()); - } - - for (Array rec : satisfyingHostRecords) { - String[] row = rec.data; - for (int i = 0; i < row.length; i++) { - min[i] = orders[i].min(min[i], row[i]); - max[i] = orders[i].max(max[i], row[i]); - } - } - } - - private static class SingleColumnTuple implements IEvaluatableTuple { - - private TblColRef col; - private String value; - - SingleColumnTuple(TblColRef col) { - this.col = col; - } - - @Override - public Object getValue(TblColRef col) { - if (this.col.equals(col)) - return value; - else - throw new IllegalArgumentException("unexpected column " + col); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/FuzzyValueCombination.java b/storage/src/main/java/org/apache/kylin/storage/hbase/FuzzyValueCombination.java deleted file mode 100644 index 3081e1f..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/FuzzyValueCombination.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * @author yangli9 - * - */ -public class FuzzyValueCombination { - - private static class Dim { - TblColRef col; - Set values; - } - - private static final Set SINGLE_NULL_SET = Sets.newHashSet(); - static { - SINGLE_NULL_SET.add(null); - } - - public static List> calculate(Map> fuzzyValues, long cap) { - Dim[] dims = toDims(fuzzyValues); - capDims(dims, cap); - return combination(dims); - } - - @SuppressWarnings("unchecked") - private static List> combination(Dim[] dims) { - - List> result = Lists.newArrayList(); - - int emptyDims = 0; - for (Dim dim : dims) { - if (dim.values.isEmpty()) { - dim.values = SINGLE_NULL_SET; - emptyDims++; - } - } - if (emptyDims == dims.length) { - return result; - } - - Map r = Maps.newHashMap(); - Iterator[] iters = new Iterator[dims.length]; - int level = 0; - while (true) { - Dim dim = dims[level]; - if (iters[level] == null) { - iters[level] = dim.values.iterator(); - } - - Iterator it = iters[level]; - if (it.hasNext() == false) { - if (level == 0) - break; - r.remove(dim.col); - iters[level] = null; - level--; - continue; - } - - r.put(dim.col, it.next()); - if (level == dims.length - 1) { - result.add(new HashMap(r)); - } else { - level++; - } - } - return result; - } - - private static Dim[] toDims(Map> fuzzyValues) { - Dim[] dims = new Dim[fuzzyValues.size()]; - int i = 0; - for (Entry> entry : fuzzyValues.entrySet()) { - dims[i] = new Dim(); - dims[i].col = entry.getKey(); - dims[i].values = entry.getValue(); - if (dims[i].values == null) - dims[i].values = Collections.emptySet(); - i++; - } - return dims; - } - - private static void capDims(Dim[] dims, long cap) { - Arrays.sort(dims, new Comparator() { - @Override - public int compare(Dim o1, Dim o2) { - return -(o1.values.size() - o2.values.size()); - } - }); - - for (Dim dim : dims) { - if (combCount(dims) < cap) - break; - dim.values = Collections.emptySet(); - } - } - - private static long combCount(Dim[] dims) { - long count = 1; - for (Dim dim : dims) { - count *= Math.max(dim.values.size(), 1); - } - return count; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java deleted file mode 100644 index 6af8ec0..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIRow; - -/** - * @author yangli9 - * - */ -public class HBaseClientKVIterator implements Iterable, Closeable { - - byte[] family; - - HTableInterface table; - ResultScanner scanner; - Iterator iterator; - - public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family) throws IOException { - this.family = family; - - this.table = hconn.getTable(tableName); - this.scanner = table.getScanner(family); - this.iterator = scanner.iterator(); - } - - @Override - public void close() { - IOUtils.closeQuietly(scanner); - IOUtils.closeQuietly(table); - } - - @Override - public Iterator iterator() { - return new MyIterator(); - } - - private class MyIterator implements Iterator { - - ImmutableBytesWritable key = new ImmutableBytesWritable(); - ImmutableBytesWritable value = new ImmutableBytesWritable(); - ImmutableBytesWritable dict = new ImmutableBytesWritable(); - IIRow pair = new IIRow(key, value, dict); - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public IIRow next() { - Result r = iterator.next(); - Cell c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES); - key.set(c.getRowArray(), c.getRowOffset(), c.getRowLength()); - value.set(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - c = r.getColumnLatestCell(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES); - dict.set(c.getValueArray(), c.getValueOffset(), c.getValueLength()); - return pair; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java deleted file mode 100644 index efd733c..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseKeyRange.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.FuzzyKeyEncoder; -import org.apache.kylin.cube.kv.FuzzyMaskEncoder; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * - * @author xjiang - * - */ -public class HBaseKeyRange implements Comparable { - - private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class); - - private static final int FUZZY_VALUE_CAP = 20; - private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 }; - - private final CubeSegment cubeSeg; - private final Cuboid cuboid; - private final List> flatOrAndFilter; // OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR .. - - private byte[] startKey; - private byte[] stopKey; - private List> fuzzyKeys; - - private String startKeyString; - private String stopKeyString; - private String fuzzyKeyString; - - private long partitionColumnStartDate = Long.MIN_VALUE; - private long partitionColumnEndDate = Long.MAX_VALUE; - - public HBaseKeyRange(CubeSegment cubeSeg, Cuboid cuboid, byte[] startKey, byte[] stopKey, List> fuzzyKeys, List> flatColumnValueFilter, long partitionColumnStartDate, long partitionColumnEndDate) { - this.cubeSeg = cubeSeg; - this.cuboid = cuboid; - this.startKey = startKey; - this.stopKey = stopKey; - this.fuzzyKeys = fuzzyKeys; - this.flatOrAndFilter = flatColumnValueFilter; - this.partitionColumnStartDate = partitionColumnStartDate; - this.partitionColumnEndDate = partitionColumnEndDate; - initDebugString(); - } - - public HBaseKeyRange(Collection dimensionColumns, Collection andDimensionRanges, CubeSegment cubeSeg, CubeDesc cubeDesc) { - this.cubeSeg = cubeSeg; - long cuboidId = this.calculateCuboidID(cubeDesc, dimensionColumns); - this.cuboid = Cuboid.findById(cubeDesc, cuboidId); - this.flatOrAndFilter = Lists.newLinkedList(); - this.flatOrAndFilter.add(andDimensionRanges); - init(andDimensionRanges); - initDebugString(); - } - - private long calculateCuboidID(CubeDesc cube, Collection dimensions) { - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cube.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return cuboidID; - } - - private void init(Collection andDimensionRanges) { - int size = andDimensionRanges.size(); - Map startValues = Maps.newHashMapWithExpectedSize(size); - Map stopValues = Maps.newHashMapWithExpectedSize(size); - Map> fuzzyValues = Maps.newHashMapWithExpectedSize(size); - for (ColumnValueRange dimRange : andDimensionRanges) { - TblColRef column = dimRange.getColumn(); - startValues.put(column, dimRange.getBeginValue()); - stopValues.put(column, dimRange.getEndValue()); - fuzzyValues.put(column, dimRange.getEqualValues()); - - TblColRef partitionDateColumnRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); - if (column.equals(partitionDateColumnRef)) { - initPartitionRange(dimRange); - } - } - - AbstractRowKeyEncoder encoder = AbstractRowKeyEncoder.createInstance(cubeSeg, cuboid); - - encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE); - - this.startKey = encoder.encode(startValues); - - encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE); - - // In order to make stopRow inclusive add a trailing 0 byte. #See - // Scan.setStopRow(byte [] stopRow) - this.stopKey = Bytes.add(encoder.encode(stopValues), ZERO_TAIL_BYTES); - - // restore encoder defaults for later reuse (note - // AbstractRowKeyEncoder.createInstance() caches instances) - encoder.setBlankByte(AbstractRowKeyEncoder.DEFAULT_BLANK_BYTE); - - // always fuzzy match cuboid ID to lock on the selected cuboid - this.fuzzyKeys = buildFuzzyKeys(fuzzyValues); - } - - private void initPartitionRange(ColumnValueRange dimRange) { - if (null != dimRange.getBeginValue()) { - this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue()); - } - if (null != dimRange.getEndValue()) { - this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue()); - } - } - - private void initDebugString() { - this.startKeyString = BytesUtil.toHex(this.startKey); - this.stopKeyString = BytesUtil.toHex(this.stopKey); - StringBuilder buf = new StringBuilder(); - for (Pair fuzzyKey : this.fuzzyKeys) { - buf.append(BytesUtil.toHex(fuzzyKey.getFirst())); - buf.append(" "); - buf.append(BytesUtil.toHex(fuzzyKey.getSecond())); - buf.append(System.lineSeparator()); - } - this.fuzzyKeyString = buf.toString(); - } - - private List> buildFuzzyKeys(Map> fuzzyValueSet) { - ArrayList> result = new ArrayList>(); - - // debug/profiling purpose - if (BackdoorToggles.getDisableFuzzyKey()) { - logger.info("The execution of this query will not use fuzzy key"); - return result; - } - - FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid); - FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid); - - List> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP); - for (Map fuzzyValue : fuzzyValues) { - result.add(new Pair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue))); - } - return result; - } - - public CubeSegment getCubeSegment() { - return this.cubeSeg; - } - - public Cuboid getCuboid() { - return cuboid; - } - - public byte[] getStartKey() { - return startKey; - } - - public byte[] getStopKey() { - return stopKey; - } - - public List> getFuzzyKeys() { - return fuzzyKeys; - } - - public String getStartKeyAsString() { - return startKeyString; - } - - public String getStopKeyAsString() { - return stopKeyString; - } - - public String getFuzzyKeyAsString() { - return fuzzyKeyString; - } - - public List> getFlatOrAndFilter() { - return flatOrAndFilter; - } - - public long getPartitionColumnStartDate() { - return partitionColumnStartDate; - } - - public long getPartitionColumnEndDate() { - return partitionColumnEndDate; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((cubeSeg == null) ? 0 : cubeSeg.hashCode()); - result = prime * result + ((cuboid == null) ? 0 : cuboid.hashCode()); - result = prime * result + ((fuzzyKeyString == null) ? 0 : fuzzyKeyString.hashCode()); - result = prime * result + ((startKeyString == null) ? 0 : startKeyString.hashCode()); - result = prime * result + ((stopKeyString == null) ? 0 : stopKeyString.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HBaseKeyRange other = (HBaseKeyRange) obj; - if (cubeSeg == null) { - if (other.cubeSeg != null) - return false; - } else if (!cubeSeg.equals(other.cubeSeg)) - return false; - if (cuboid == null) { - if (other.cuboid != null) - return false; - } else if (!cuboid.equals(other.cuboid)) - return false; - if (fuzzyKeyString == null) { - if (other.fuzzyKeyString != null) - return false; - } else if (!fuzzyKeyString.equals(other.fuzzyKeyString)) - return false; - if (startKeyString == null) { - if (other.startKeyString != null) - return false; - } else if (!startKeyString.equals(other.startKeyString)) - return false; - if (stopKeyString == null) { - if (other.stopKeyString != null) - return false; - } else if (!stopKeyString.equals(other.stopKeyString)) - return false; - return true; - } - - @Override - public int compareTo(HBaseKeyRange other) { - return Bytes.compareTo(this.startKey, other.startKey); - } - - public boolean hitSegment() { - return cubeSeg.getDateRangeStart() <= getPartitionColumnEndDate() && cubeSeg.getDateRangeEnd() >= getPartitionColumnStartDate(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java deleted file mode 100644 index 9aa2baa..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageQuery.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.util.ArrayList; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.storage.ICachableStorageQuery; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointTupleIterator; -import org.apache.kylin.storage.tuple.TupleInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Range; - -/** - * @author yangli9 - */ -public class InvertedIndexStorageQuery implements ICachableStorageQuery { - - private static Logger logger = LoggerFactory.getLogger(InvertedIndexStorageQuery.class); - - private IISegment seg; - private String uuid; - private EndpointTupleIterator dataIterator; - - public InvertedIndexStorageQuery(IIInstance ii) { - this.seg = ii.getFirstSegment(); - this.uuid = ii.getUuid(); - } - - @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - String tableName = seg.getStorageLocationIdentifier(); - - //HConnection is cached, so need not be closed - HConnection conn = HBaseConnection.get(context.getConnUrl()); - try { - dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo); - return dataIterator; - } catch (Throwable e) { - logger.error("Error when connecting to II htable " + tableName, e); - throw new IllegalStateException("Error when connecting to II htable " + tableName, e); - } - } - - @Override - public Range getVolatilePeriod() { - return dataIterator.getCacheExcludedPeriod(); - } - - @Override - public String getStorageUUID() { - return this.uuid; - } - - @Override - public boolean isDynamic() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java deleted file mode 100644 index b69d5b6..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.token.TokenUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.kylin.common.util.Bytes; - -/** - * @author yangli9 - * - */ -public class PingHBaseCLI { - - public static void main(String[] args) throws IOException { - String metadataUrl = args[0]; - String hbaseTable = args[1]; - - System.out.println("Hello friend."); - - Configuration hconf = HBaseConnection.newHBaseConfiguration(metadataUrl); - if (User.isHBaseSecurityEnabled(hconf)) { - try { - System.out.println("--------------Getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - TokenUtil.obtainAndCacheToken(hconf, UserGroupInformation.getCurrentUser()); - } catch (InterruptedException e) { - System.out.println("--------------Error while getting kerberos credential for user " + UserGroupInformation.getCurrentUser().getUserName()); - } - } - - Scan scan = new Scan(); - int limit = 20; - - HConnection conn = null; - HTableInterface table = null; - ResultScanner scanner = null; - try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); - scanner = table.getScanner(scan); - int count = 0; - for (Result r : scanner) { - byte[] rowkey = r.getRow(); - System.out.println(Bytes.toStringBinary(rowkey)); - count++; - if (count == limit) - break; - } - } finally { - if (scanner != null) { - scanner.close(); - } - if (table != null) { - table.close(); - } - if (conn != null) { - conn.close(); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java deleted file mode 100644 index 654dc34..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/RegionScannerAdapter.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -/** - * @author yangli9 - * - */ -public class RegionScannerAdapter implements RegionScanner { - - private ResultScanner scanner; - - public RegionScannerAdapter(ResultScanner scanner) { - this.scanner = scanner; - } - - @Override - public boolean next(List results) throws IOException { - Result result = scanner.next(); - if (result == null) // EOF - return false; - - results.addAll(result.listCells()); - return true; - } - - @Override - public boolean next(List result, int limit) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List result) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List result, int limit) throws IOException { - return next(result); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return null; - } - - @Override - public long getMaxResultSize() { - return Long.MAX_VALUE; - } - - @Override - public boolean isFilterDone() throws IOException { - return false; - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getMvccReadPoint() { - return Long.MAX_VALUE; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/ResultScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/ResultScannerAdapter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/ResultScannerAdapter.java deleted file mode 100644 index a0e2104..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/ResultScannerAdapter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - * - */ -public class ResultScannerAdapter implements ResultScanner { - - private RegionScanner scanner; - - public ResultScannerAdapter(RegionScanner scanner) { - this.scanner = scanner; - } - - @Override - public Iterator iterator() { - return new Iterator() { - - Result next = null; - - @Override - public boolean hasNext() { - if (next == null) { - try { - next = ResultScannerAdapter.this.next(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return next != null; - } - - @Override - public Result next() { - Result r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public Result next() throws IOException { - List cells = Lists.newArrayList(); - scanner.next(cells); - if (cells.isEmpty()) - return null; - else - return Result.create(cells); - } - - @Override - public Result[] next(int nbRows) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - try { - scanner.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage/src/main/java/org/apache/kylin/storage/hbase/ScanOutOfLimitException.java ---------------------------------------------------------------------- diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/ScanOutOfLimitException.java b/storage/src/main/java/org/apache/kylin/storage/hbase/ScanOutOfLimitException.java deleted file mode 100644 index e76b00d..0000000 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/ScanOutOfLimitException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase; - -/** - * @author ysong1 - * - */ -public class ScanOutOfLimitException extends RuntimeException { - private static final long serialVersionUID = 2045169570038227895L; - - public ScanOutOfLimitException(String message) { - super(message); - } -}