Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0146A18C2F for ; Tue, 22 Sep 2015 05:03:35 +0000 (UTC) Received: (qmail 49133 invoked by uid 500); 22 Sep 2015 05:03:34 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 48966 invoked by uid 500); 22 Sep 2015 05:03:34 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 47069 invoked by uid 99); 22 Sep 2015 05:03:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Sep 2015 05:03:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E789E051D; Tue, 22 Sep 2015 05:03:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: daijy@apache.org To: commits@hive.apache.org Date: Tue, 22 Sep 2015 05:03:54 -0000 Message-Id: <717f1dd81de84c4bba40588a6652ec5f@git.apache.org> In-Reply-To: <3a8142810d19485fb93096965f1fe625@git.apache.org> References: <3a8142810d19485fb93096965f1fe625@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] hive git commit: HIVE-10289: Support filter on non-first partition key and non-string partition key (Daniel Dai reviewed by Alan Gates) http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java index ec99685..9762309 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java @@ -20,15 +20,30 @@ package org.apache.hadoop.hive.metastore.hbase; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -147,7 +162,7 @@ class HBaseFilterPlanUtil { public static class ScanPlan extends FilterPlan { public static class ScanMarker { - final byte[] bytes; + final String value; /** * If inclusive = true, it means that the * marker includes those bytes. @@ -155,20 +170,24 @@ class HBaseFilterPlanUtil { * or ends at the next possible byte array */ final boolean isInclusive; - ScanMarker(byte [] b, boolean i){ - this.bytes = b; + final String type; + ScanMarker(String obj, boolean i, String type){ + this.value = obj; this.isInclusive = i; + this.type = type; } @Override public String toString() { - return "ScanMarker [bytes=" + Arrays.toString(bytes) + ", isInclusive=" + isInclusive + "]"; + return "ScanMarker [" + "value=" + value.toString() + ", isInclusive=" + isInclusive + + ", type=" + type + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + Arrays.hashCode(bytes); + result = prime * result + value.hashCode(); result = prime * result + (isInclusive ? 1231 : 1237); + result = prime * result + type.hashCode(); return result; } @Override @@ -180,48 +199,118 @@ class HBaseFilterPlanUtil { if (getClass() != obj.getClass()) return false; ScanMarker other = (ScanMarker) obj; - if (!Arrays.equals(bytes, other.bytes)) + if (!value.equals(other.value)) return false; if (isInclusive != other.isInclusive) return false; + if (type != other.type) + return false; return true; } } - // represent Scan start - private ScanMarker startMarker = new ScanMarker(null, false); - // represent Scan end - private ScanMarker endMarker = new ScanMarker(null, false); - - private ScanFilter filter; - - public ScanFilter getFilter() { - return filter; + public static class ScanMarkerPair { + public ScanMarkerPair(ScanMarker startMarker, ScanMarker endMarker) { + this.startMarker = startMarker; + this.endMarker = endMarker; + } + ScanMarker startMarker; + ScanMarker endMarker; + } + // represent Scan start, partition key name -> scanMarkerPair + Map markers = new HashMap(); + List ops = new ArrayList(); + + // Get the number of partition key prefixes which can be used in the scan range. + // For example, if partition key is (year, month, state) + // 1. year = 2015 and month >= 1 and month < 5 + // year + month can be used in scan range, majorParts = 2 + // 2. year = 2015 and state = 'CA' + // only year can be used in scan range, majorParts = 1 + // 3. month = 10 and state = 'CA' + // nothing can be used in scan range, majorParts = 0 + private int getMajorPartsCount(List parts) { + int majorPartsCount = 0; + while (majorPartsCount parts) { + int majorPartsCount = getMajorPartsCount(parts); + Set majorKeys = new HashSet(); + for (int i=0;i names = HBaseUtils.getPartitionNames(parts); + List ranges = new ArrayList(); + for (Map.Entry entry : markers.entrySet()) { + if (names.contains(entry.getKey()) && !majorKeys.contains(entry.getKey())) { + PartitionKeyComparator.Mark startMark = null; + if (entry.getValue().startMarker != null) { + startMark = new PartitionKeyComparator.Mark(entry.getValue().startMarker.value, + entry.getValue().startMarker.isInclusive); + } + PartitionKeyComparator.Mark endMark = null; + if (entry.getValue().endMarker != null) { + startMark = new PartitionKeyComparator.Mark(entry.getValue().endMarker.value, + entry.getValue().endMarker.isInclusive); + } + PartitionKeyComparator.Range range = new PartitionKeyComparator.Range( + entry.getKey(), startMark, endMark); + ranges.add(range); + } + } - public ScanMarker getStartMarker() { - return startMarker; + if (ranges.isEmpty() && ops.isEmpty()) { + return null; + } else { + return new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator( + StringUtils.join(names, ","), StringUtils.join(HBaseUtils.getPartitionKeyTypes(parts), ","), + ranges, ops)); + } } - public void setStartMarker(ScanMarker startMarker) { - this.startMarker = startMarker; - } - public void setStartMarker(byte[] start, boolean isInclusive) { - setStartMarker(new ScanMarker(start, isInclusive)); + public void setStartMarker(String keyName, String keyType, String start, boolean isInclusive) { + if (markers.containsKey(keyName)) { + markers.get(keyName).startMarker = new ScanMarker(start, isInclusive, keyType); + } else { + ScanMarkerPair marker = new ScanMarkerPair(new ScanMarker(start, isInclusive, keyType), null); + markers.put(keyName, marker); + } } - public ScanMarker getEndMarker() { - return endMarker; + public ScanMarker getStartMarker(String keyName) { + if (markers.containsKey(keyName)) { + return markers.get(keyName).startMarker; + } else { + return null; + } } - public void setEndMarker(ScanMarker endMarker) { - this.endMarker = endMarker; + public void setEndMarker(String keyName, String keyType, String end, boolean isInclusive) { + if (markers.containsKey(keyName)) { + markers.get(keyName).endMarker = new ScanMarker(end, isInclusive, keyType); + } else { + ScanMarkerPair marker = new ScanMarkerPair(null, new ScanMarker(end, isInclusive, keyType)); + markers.put(keyName, marker); + } } - public void setEndMarker(byte[] end, boolean isInclusive) { - setEndMarker(new ScanMarker(end, isInclusive)); + + public ScanMarker getEndMarker(String keyName) { + if (markers.containsKey(keyName)) { + return markers.get(keyName).endMarker; + } else { + return null; + } } @Override @@ -236,28 +325,33 @@ class HBaseFilterPlanUtil { private ScanPlan and(ScanPlan other) { // create combined FilterPlan based on existing lhs and rhs plan ScanPlan newPlan = new ScanPlan(); + newPlan.markers.putAll(markers); + + for (String keyName : other.markers.keySet()) { + if (newPlan.markers.containsKey(keyName)) { + // create new scan start + ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(keyName), + other.getStartMarker(keyName), true); + if (greaterStartMarker != null) { + newPlan.setStartMarker(keyName, greaterStartMarker.type, greaterStartMarker.value, greaterStartMarker.isInclusive); + } + + // create new scan end + ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(keyName), other.getEndMarker(keyName), + false); + if (lesserEndMarker != null) { + newPlan.setEndMarker(keyName, lesserEndMarker.type, lesserEndMarker.value, lesserEndMarker.isInclusive); + } + } else { + newPlan.markers.put(keyName, other.markers.get(keyName)); + } + } - // create new scan start - ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(), - other.getStartMarker(), true); - newPlan.setStartMarker(greaterStartMarker); - - // create new scan end - ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(), other.getEndMarker(), - false); - newPlan.setEndMarker(lesserEndMarker); - - // create new filter plan - newPlan.setFilter(createCombinedFilter(this.getFilter(), other.getFilter())); - + newPlan.ops.addAll(ops); + newPlan.ops.addAll(other.ops); return newPlan; } - private ScanFilter createCombinedFilter(ScanFilter filter1, ScanFilter filter2) { - // TODO create combined filter - filter1 && filter2 - return null; - } - /** * @param lStartMarker * @param rStartMarker @@ -268,13 +362,23 @@ class HBaseFilterPlanUtil { static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker, boolean getGreater) { // if one of them has null bytes, just return other - if(lStartMarker.bytes == null) { + if(lStartMarker == null) { return rStartMarker; - } else if (rStartMarker.bytes == null) { + } else if (rStartMarker == null) { return lStartMarker; } - - int compareRes = compare(lStartMarker.bytes, rStartMarker.bytes); + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(lStartMarker.type); + ObjectInspector outputOI = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType); + Converter lConverter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + Converter rConverter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + Comparable lValue = (Comparable)lConverter.convert(lStartMarker.value); + Comparable rValue = (Comparable)rConverter.convert(rStartMarker.value); + + int compareRes = lValue.compareTo(rValue); if (compareRes == 0) { // bytes are equal, now compare the isInclusive flags if (lStartMarker.isInclusive == rStartMarker.isInclusive) { @@ -287,7 +391,7 @@ class HBaseFilterPlanUtil { isInclusive = false; } // else - return new ScanMarker(lStartMarker.bytes, isInclusive); + return new ScanMarker(lStartMarker.value, isInclusive, lStartMarker.type); } if (getGreater) { return compareRes == 1 ? lStartMarker : rStartMarker; @@ -313,42 +417,74 @@ class HBaseFilterPlanUtil { /** * @return row suffix - This is appended to db + table, to generate start row for the Scan */ - public byte[] getStartRowSuffix() { - if (startMarker.isInclusive) { - return startMarker.bytes; - } else { - return HBaseUtils.getEndPrefix(startMarker.bytes); + public byte[] getStartRowSuffix(String dbName, String tableName, List parts) { + int majorPartsCount = getMajorPartsCount(parts); + List majorPartTypes = new ArrayList(); + List components = new ArrayList(); + boolean endPrefix = false; + for (int i=0;i parts) { + int majorPartsCount = getMajorPartsCount(parts); + List majorPartTypes = new ArrayList(); + List components = new ArrayList(); + boolean endPrefix = false; + for (int i=0;i entry : markers.entrySet()) { + sb.append("key=" + entry.getKey() + "[startMarker=" + entry.getValue().startMarker + + ", endMarker=" + entry.getValue().endMarker + "]"); + } + return sb.toString(); } } /** - * represent a plan that can be used to create a hbase filter and then set in - * Scan.setFilter() - */ - public static class ScanFilter { - // TODO: implement this - } - - /** * Visitor for ExpressionTree. * It first generates the ScanPlan for the leaf nodes. The higher level nodes are * either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with @@ -369,9 +505,12 @@ class HBaseFilterPlanUtil { // temporary params for current left and right side plans, for AND, OR private FilterPlan rPlan; - private final String firstPartcolumn; - public PartitionFilterGenerator(String firstPartitionColumn) { - this.firstPartcolumn = firstPartitionColumn; + private Map nameToType = new HashMap(); + + public PartitionFilterGenerator(List parts) { + for (FieldSchema part : parts) { + nameToType.put(part.getName(), part.getType()); + } } FilterPlan getPlan() { @@ -414,63 +553,37 @@ class HBaseFilterPlanUtil { public void visit(LeafNode node) throws MetaException { ScanPlan leafPlan = new ScanPlan(); curPlan = leafPlan; - if (!isFirstParitionColumn(node.keyName)) { - leafPlan.setFilter(generateScanFilter(node)); - return; - } - if (!(node.value instanceof String)) { - // only string type is supported currently - // treat conditions on other types as true - return; - } // this is a condition on first partition column, so might influence the // start and end of the scan final boolean INCLUSIVE = true; switch (node.operator) { case EQUALS: - leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); - leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case GREATERTHAN: - leafPlan.setStartMarker(toBytes(node.value), !INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE); break; case GREATERTHANOREQUALTO: - leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case LESSTHAN: - leafPlan.setEndMarker(toBytes(node.value), !INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE); break; case LESSTHANOREQUALTO: - leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case LIKE: + leafPlan.ops.add(new Operator(Operator.Type.LIKE, node.keyName, node.value.toString())); + break; case NOTEQUALS: case NOTEQUALS2: - // TODO: create filter plan for these - hasUnsupportedCondition = true; + leafPlan.ops.add(new Operator(Operator.Type.NOTEQUALS, node.keyName, node.value.toString())); break; } } - @VisibleForTesting - static byte[] toBytes(Object value) { - // TODO: actually implement this - // We need to determine the actual type and use appropriate - // serialization format for that type - return ((String) value).getBytes(HBaseUtils.ENCODING); - } - - private ScanFilter generateScanFilter(LeafNode node) { - // TODO Auto-generated method stub - hasUnsupportedCondition = true; - return null; - } - - private boolean isFirstParitionColumn(String keyName) { - return keyName.equalsIgnoreCase(firstPartcolumn); - } - private boolean hasUnsupportedCondition() { return hasUnsupportedCondition; } @@ -486,12 +599,12 @@ class HBaseFilterPlanUtil { } } - public static PlanResult getFilterPlan(ExpressionTree exprTree, String firstPartitionColumn) throws MetaException { + public static PlanResult getFilterPlan(ExpressionTree exprTree, List parts) throws MetaException { if (exprTree == null) { // TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102 return new PlanResult(new ScanPlan(), true); } - PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(firstPartitionColumn); + PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(parts); exprTree.accept(pGenerator); return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition()); } http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index ca1582e..66c46a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -51,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator; import org.apache.hive.common.util.BloomFilter; import java.io.IOException; @@ -493,12 +497,12 @@ public class HBaseReadWrite { * @return a list of partition objects. * @throws IOException */ - List getPartitions(String dbName, String tableName, List> partValLists) - throws IOException { + List getPartitions(String dbName, String tableName, List partTypes, + List> partValLists) throws IOException { List parts = new ArrayList<>(partValLists.size()); List gets = new ArrayList<>(partValLists.size()); for (List partVals : partValLists) { - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partTypes, partVals); Get get = new Get(key); get.addColumn(CATALOG_CF, CATALOG_COL); gets.add(get); @@ -526,7 +530,8 @@ public class HBaseReadWrite { */ void putPartition(Partition partition) throws IOException { byte[] hash = putStorageDescriptor(partition.getSd()); - byte[][] serialized = HBaseUtils.serializePartition(partition, hash); + byte[][] serialized = HBaseUtils.serializePartition(partition, + HBaseUtils.getPartitionKeyTypes(getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()), hash); store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); partCache.put(partition.getDbName(), partition.getTableName(), partition); } @@ -547,7 +552,8 @@ public class HBaseReadWrite { decrementStorageDescriptorRefCount(oldPart.getSd()); hash = putStorageDescriptor(newPart.getSd()); } - byte[][] serialized = HBaseUtils.serializePartition(newPart, hash); + byte[][] serialized = HBaseUtils.serializePartition(newPart, + HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash); store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); partCache.put(newPart.getDbName(), newPart.getTableName(), newPart); if (!oldPart.getTableName().equals(newPart.getTableName())) { @@ -565,7 +571,9 @@ public class HBaseReadWrite { List puts = new ArrayList<>(partitions.size()); for (Partition partition : partitions) { byte[] hash = putStorageDescriptor(partition.getSd()); - byte[][] serialized = HBaseUtils.serializePartition(partition, hash); + List partTypes = HBaseUtils.getPartitionKeyTypes( + getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()); + byte[][] serialized = HBaseUtils.serializePartition(partition, partTypes, hash); Put p = new Put(serialized[0]); p.add(CATALOG_CF, CATALOG_COL, serialized[1]); puts.add(p); @@ -591,7 +599,9 @@ public class HBaseReadWrite { decrementStorageDescriptorRefCount(oldParts.get(i).getSd()); hash = putStorageDescriptor(newParts.get(i).getSd()); } - byte[][] serialized = HBaseUtils.serializePartition(newParts.get(i), hash); + Partition newPart = newParts.get(i); + byte[][] serialized = HBaseUtils.serializePartition(newPart, + HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash); Put p = new Put(serialized[0]); p.add(CATALOG_CF, CATALOG_COL, serialized[1]); puts.add(p); @@ -624,8 +634,9 @@ public class HBaseReadWrite { ? new ArrayList<>(cached).subList(0, maxPartitions) : new ArrayList<>(cached); } - byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); - List parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); + byte[] keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, new ArrayList(), + new ArrayList(), false); + List parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); partCache.put(dbName, tableName, parts, true); return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts; } @@ -672,72 +683,68 @@ public class HBaseReadWrite { if (table == null) { throw new NoSuchObjectException("Unable to find table " + dbName + "." + tableName); } - if (partVals.size() == table.getPartitionKeys().size()) { - keyPrefix = HBaseUtils.buildKey(keyElements.toArray(new String[keyElements.size()])); - } else { - keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray( - new String[keyElements.size()])); - } + keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys().subList(0, keyElements.size()-2)), + keyElements.subList(0, keyElements.size()-2)); // Now, build a filter out of the remaining keys - String regex = null; + List ranges = new ArrayList(); + List ops = new ArrayList(); if (!(partVals.size() == table.getPartitionKeys().size() && firstStar == -1)) { - StringBuilder buf = new StringBuilder(".*"); + for (int i = Math.max(0, firstStar); i < table.getPartitionKeys().size() && i < partVals.size(); i++) { - buf.append(HBaseUtils.KEY_SEPARATOR); + if ("*".equals(partVals.get(i))) { - buf.append("[^"); - buf.append(HBaseUtils.KEY_SEPARATOR); - buf.append("]+"); + PartitionKeyComparator.Range range = new PartitionKeyComparator.Range( + table.getPartitionKeys().get(i).getName(), + new PartitionKeyComparator.Mark(partVals.get(i), true), + new PartitionKeyComparator.Mark(partVals.get(i), true)); + ranges.add(range); } else { - buf.append(partVals.get(i)); + PartitionKeyComparator.Operator op = new PartitionKeyComparator.Operator( + PartitionKeyComparator.Operator.Type.LIKE, + table.getPartitionKeys().get(i).getName(), + ".*"); } } - if (partVals.size() < table.getPartitionKeys().size()) { - buf.append(HBaseUtils.KEY_SEPARATOR); - buf.append(".*"); - } - regex = buf.toString(); } Filter filter = null; - if (regex != null) { - filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); + if (!ranges.isEmpty() || !ops.isEmpty()) { + filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator( + StringUtils.join(HBaseUtils.getPartitionNames(table.getPartitionKeys()), ","), + StringUtils.join(HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys()), ","), + ranges, ops)); } if (LOG.isDebugEnabled()) { LOG.debug("Scanning partitions with prefix <" + new String(keyPrefix) + "> and filter <" + - regex + ">"); + filter + ">"); } - List parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter); + List parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, + HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter); partCache.put(dbName, tableName, parts, false); return parts; } List scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd, Filter filter, int maxPartitions) throws IOException, NoSuchObjectException { - List keyElements = new ArrayList<>(); - keyElements.add(dbName); - keyElements.add(tableName); - - byte[] keyPrefix = - HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(new String[keyElements.size()])); - byte[] startRow = ArrayUtils.addAll(keyPrefix, keyStart); + byte[] startRow = keyStart; byte[] endRow; if (keyEnd == null || keyEnd.length == 0) { // stop when current db+table entries are over - endRow = HBaseUtils.getEndPrefix(keyPrefix); + endRow = HBaseUtils.getEndPrefix(startRow); } else { - endRow = ArrayUtils.addAll(keyPrefix, keyEnd); + endRow = keyEnd; } if (LOG.isDebugEnabled()) { LOG.debug("Scanning partitions with start row <" + new String(startRow) + "> and end row <" + new String(endRow) + ">"); } - return scanPartitionsWithFilter(startRow, endRow, maxPartitions, filter); + return scanPartitionsWithFilter(dbName, tableName, startRow, endRow, maxPartitions, filter); } @@ -762,7 +769,8 @@ public class HBaseReadWrite { Partition p = getPartition(dbName, tableName, partVals, false); decrementStorageDescriptorRefCount(p.getSd()); } - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals); delete(PART_TABLE, key, null, null); } @@ -770,7 +778,8 @@ public class HBaseReadWrite { boolean populateCache) throws IOException { Partition cached = partCache.get(dbName, tableName, partVals); if (cached != null) return cached; - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals); byte[] serialized = read(PART_TABLE, key, CATALOG_CF, CATALOG_COL); if (serialized == null) return null; HBaseUtils.StorageDescriptorParts sdParts = @@ -781,17 +790,18 @@ public class HBaseReadWrite { return sdParts.containingPartition; } - private List scanPartitionsWithFilter(byte[] startRow, byte [] endRow, - int maxResults, Filter filter) + private List scanPartitionsWithFilter(String dbName, String tableName, + byte[] startRow, byte [] endRow, int maxResults, Filter filter) throws IOException { Iterator iter = scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter); + List tablePartitions = getTable(dbName, tableName).getPartitionKeys(); List parts = new ArrayList<>(); int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; for (int i = 0; i < numToFetch && iter.hasNext(); i++) { Result result = iter.next(); - HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(result.getRow(), - result.getValue(CATALOG_CF, CATALOG_COL)); + HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(dbName, tableName, + tablePartitions, result.getRow(), result.getValue(CATALOG_CF, CATALOG_COL)); StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash); HBaseUtils.assembleStorageDescriptor(sd, sdParts); parts.add(sdParts.containingPartition); @@ -1558,7 +1568,9 @@ public class HBaseReadWrite { for (int i = 0; i < partNames.size(); i++) { valToPartMap.put(partVals.get(i), partNames.get(i)); - byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(i)); + byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tblName).getPartitionKeys()), + partVals.get(i)); Get get = new Get(partKey); for (byte[] colName : colNameBytes) { get.addColumn(STATS_CF, colName); @@ -1690,9 +1702,11 @@ public class HBaseReadWrite { return keys; } - private byte[] getStatisticsKey(String dbName, String tableName, List partVals) { + private byte[] getStatisticsKey(String dbName, String tableName, List partVals) throws IOException { return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils - .buildPartitionKey(dbName, tableName, partVals); + .buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), + partVals); } private String getStatisticsTable(List partVals) { http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 0204f37..717e094 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -541,7 +541,8 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - List oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list); + List oldParts = getHBase().getPartitions(db_name, tbl_name, + HBaseUtils.getPartitionKeyTypes(getTable(db_name, tbl_name).getPartitionKeys()), part_vals_list); getHBase().replacePartitions(oldParts, new_parts); for (List part_vals : part_vals_list) { getHBase().getStatsCache().invalidate(db_name, tbl_name, @@ -634,10 +635,8 @@ public class HBaseStore implements RawStore { if (table == null) { throw new NoSuchObjectException("Unable to find table " + dbName + "." + tblName); } - String firstPartitionColumn = table.getPartitionKeys().get(0).getName(); // general hbase filter plan from expression tree - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, firstPartitionColumn); - + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, table.getPartitionKeys()); if (LOG.isDebugEnabled()) { LOG.debug("Hbase Filter Plan generated : " + planRes.plan); } @@ -648,7 +647,9 @@ public class HBaseStore implements RawStore { for (ScanPlan splan : planRes.plan.getPlans()) { try { List parts = getHBase().scanPartitions(dbName, tblName, - splan.getStartRowSuffix(), splan.getEndRowSuffix(), null, -1); + splan.getStartRowSuffix(dbName, tblName, table.getPartitionKeys()), + splan.getEndRowSuffix(dbName, tblName, table.getPartitionKeys()), + splan.getFilter(table.getPartitionKeys()), -1); boolean reachedMax = false; for (Partition part : parts) { mergedParts.put(part.getValues(), part); http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 62bb4de..b6fa591 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -18,11 +18,14 @@ */ package org.apache.hadoop.hive.metastore.hbase; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; @@ -50,6 +53,19 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDeWithEndPrefix; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; import org.apache.hive.common.util.BloomFilter; import java.io.IOException; @@ -63,6 +79,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -712,15 +729,31 @@ class HBaseUtils { return sd; } + static List getPartitionKeyTypes(List parts) { + com.google.common.base.Function fieldSchemaToType = + new com.google.common.base.Function() { + public String apply(FieldSchema fs) { return fs.getType(); } + }; + return Lists.transform(parts, fieldSchemaToType); + } + + static List getPartitionNames(List parts) { + com.google.common.base.Function fieldSchemaToName = + new com.google.common.base.Function() { + public String apply(FieldSchema fs) { return fs.getName(); } + }; + return Lists.transform(parts, fieldSchemaToName); + } + /** * Serialize a partition * @param part partition object * @param sdHash hash that is being used as a key for the enclosed storage descriptor * @return First element is the key, second is the serialized partition */ - static byte[][] serializePartition(Partition part, byte[] sdHash) { + static byte[][] serializePartition(Partition part, List partTypes, byte[] sdHash) { byte[][] result = new byte[2][]; - result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), part.getValues()); + result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), partTypes, part.getValues()); HbaseMetastoreProto.Partition.Builder builder = HbaseMetastoreProto.Partition.newBuilder(); builder .setCreateTime(part.getCreateTime()) @@ -735,11 +768,54 @@ class HBaseUtils { return result; } - static byte[] buildPartitionKey(String dbName, String tableName, List partVals) { - Deque keyParts = new ArrayDeque<>(partVals); - keyParts.addFirst(tableName); - keyParts.addFirst(dbName); - return buildKey(keyParts.toArray(new String[keyParts.size()])); + static byte[] buildPartitionKey(String dbName, String tableName, List partTypes, List partVals) { + return buildPartitionKey(dbName, tableName, partTypes, partVals, false); + } + + static byte[] buildPartitionKey(String dbName, String tableName, List partTypes, List partVals, boolean endPrefix) { + Object[] components = new Object[partVals.size()]; + for (int i=0;i partTypes, Object[] components, boolean endPrefix) { + ObjectInspector javaStringOI = + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING); + Object[] data = new Object[components.length+2]; + List fois = new ArrayList(components.length+2); + boolean[] endPrefixes = new boolean[components.length+2]; + + data[0] = dbName; + fois.add(javaStringOI); + endPrefixes[0] = false; + data[1] = tableName; + fois.add(javaStringOI); + endPrefixes[1] = false; + + for (int i = 0; i < components.length; i++) { + data[i+2] = components[i]; + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i)); + ObjectInspector outputOI = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType); + fois.add(outputOI); + } + Output output = new Output(); + try { + BinarySortableSerDeWithEndPrefix.serializeStruct(output, data, fois, endPrefix); + } catch (SerDeException e) { + throw new RuntimeException("Cannot serialize partition " + StringUtils.join(components, ",")); + } + return Arrays.copyOf(output.getData(), output.getLength()); } static class StorageDescriptorParts { @@ -771,11 +847,10 @@ class HBaseUtils { * @param serialized the value fetched from HBase * @return A struct that contains the partition plus parts of the storage descriptor */ - static StorageDescriptorParts deserializePartition(byte[] key, byte[] serialized) - throws InvalidProtocolBufferException { - String[] keys = deserializeKey(key); - return deserializePartition(keys[0], keys[1], - Arrays.asList(Arrays.copyOfRange(keys, 2, keys.length)), serialized); + static StorageDescriptorParts deserializePartition(String dbName, String tableName, List partitions, + byte[] key, byte[] serialized) throws InvalidProtocolBufferException { + List keys = deserializePartitionKey(partitions, key); + return deserializePartition(dbName, tableName, keys, serialized); } /** @@ -811,6 +886,36 @@ class HBaseUtils { return k.split(KEY_SEPARATOR_STR); } + private static List deserializePartitionKey(List partitions, byte[] key) { + StringBuffer names = new StringBuffer(); + names.append("dbName,tableName,"); + StringBuffer types = new StringBuffer(); + types.append("string,string,"); + for (int i=0;i partitionKeys = new ArrayList(); + for (Object deserializedKey : deserializedkeys) { + partitionKeys.add(deserializedKey.toString()); + } + return partitionKeys; + } catch (SerDeException e) { + throw new RuntimeException("Error when deserialize key", e); + } + } + /** * Serialize a table * @param table table object http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java new file mode 100644 index 0000000..01fe403 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hive.metastore.hbase.HbaseMetastoreProto.PartitionKeyComparator.Operator.Type; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; + +import com.google.protobuf.InvalidProtocolBufferException; + +public class PartitionKeyComparator extends ByteArrayComparable { + private static final Log LOG = LogFactory.getLog(PartitionKeyComparator.class); + static class Mark { + Mark(String value, boolean inclusive) { + this.value = value; + this.inclusive = inclusive; + } + String value; + boolean inclusive; + public String toString() { + return value + (inclusive?"_":""); + } + } + static class Range { + Range(String keyName, Mark start, Mark end) { + this.keyName = keyName; + this.start = start; + this.end = end; + } + String keyName; + Mark start; + Mark end; + public String toString() { + return "" + keyName + ":" + (start!=null?start.toString():"") + (end!=null?end.toString():""); + } + } + // Cache the information derived from ranges for performance, including + // range in native datatype + static class NativeRange { + int pos; + Comparable start; + Comparable end; + } + static class Operator { + public Operator(Type type, String keyName, String val) { + this.type = type; + this.keyName = keyName; + this.val = val; + } + enum Type { + LIKE, NOTEQUALS + }; + Type type; + String keyName; + String val; + } + static class NativeOperator { + int pos; + Comparable val; + } + String names; + String types; + List ranges; + List nativeRanges; + List ops; + List nativeOps; + Properties serdeProps; + public PartitionKeyComparator(String names, String types, List ranges, List ops) { + super(null); + this.names = names; + this.types = types; + this.ranges = ranges; + this.ops = ops; + serdeProps = new Properties(); + serdeProps.setProperty(serdeConstants.LIST_COLUMNS, "dbName,tableName," + names); + serdeProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string,string," + types); + + this.nativeRanges = new ArrayList(this.ranges.size()); + for (int i=0;i(this.ops.size()); + for (int i=0;i ranges = new ArrayList(); + for (HbaseMetastoreProto.PartitionKeyComparator.Range range : proto.getRangeList()) { + Mark start = null; + if (range.hasStart()) { + start = new Mark(range.getStart().getValue(), range.getStart().getInclusive()); + } + Mark end = null; + if (range.hasEnd()) { + end = new Mark(range.getEnd().getValue(), range.getEnd().getInclusive()); + } + ranges.add(new Range(range.getKey(), start, end)); + } + List ops = new ArrayList(); + for (HbaseMetastoreProto.PartitionKeyComparator.Operator op : proto.getOpList()) { + ops.add(new Operator(Operator.Type.valueOf(op.getType().name()), op.getKey(), + op.getVal())); + } + return new PartitionKeyComparator(proto.getNames(), proto.getTypes(), ranges, ops); + } + + @Override + public byte[] toByteArray() { + HbaseMetastoreProto.PartitionKeyComparator.Builder builder = + HbaseMetastoreProto.PartitionKeyComparator.newBuilder(); + builder.setNames(names); + builder.setTypes(types); + for (int i=0;i=0 || + !range.start.inclusive && partVal.compareTo(nativeRange.start)>0) { + if (range.end == null || range.end.inclusive && partVal.compareTo(nativeRange.end)<=0 || + !range.end.inclusive && partVal.compareTo(nativeRange.end)<0) { + continue; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Fail to match range " + range.keyName + "-" + partVal + "[" + nativeRange.start + + "," + nativeRange.end + "]"); + } + return 1; + } + + for (int i=0;i parts = new ArrayList(); + parts.add(new FieldSchema(KEY, "int", null)); + parts.add(new FieldSchema(OTHERKEY, "int", null)); l.operator = Operator.EQUALS; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), new ScanMarker(VAL_BYTES, INCLUSIVE)); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), new ScanMarker(VAL, INCLUSIVE, "int")); l.operator = Operator.GREATERTHAN; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, !INCLUSIVE, "int"), DEFAULT_SCANMARKER); l.operator = Operator.GREATERTHANOREQUALTO; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), DEFAULT_SCANMARKER); l.operator = Operator.LESSTHAN; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, !INCLUSIVE)); + verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, !INCLUSIVE, "int")); l.operator = Operator.LESSTHANOREQUALTO; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, INCLUSIVE)); - - // following leaf node plans should currently have true for 'has unsupported condition', - // because of the unsupported operator - l.operator = Operator.NOTEQUALS; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.NOTEQUALS2; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.LIKE; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, INCLUSIVE, "int")); // following leaf node plans should currently have true for 'has unsupported condition', // because of the condition is not on first key l.operator = Operator.EQUALS; - verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.NOTEQUALS; - verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(l, parts, OTHERKEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, false); // if tree is null, it should return equivalent of full scan, and true // for 'has unsupported condition' - verifyPlan(null, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(null, parts, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); } - private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker) + private void verifyPlan(TreeNode l, List parts, String keyName, ScanMarker startMarker, ScanMarker endMarker) throws MetaException { - verifyPlan(l, keyName, startMarker, endMarker, false); + verifyPlan(l, parts, keyName, startMarker, endMarker, false); } - private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker, + private void verifyPlan(TreeNode l, List parts, String keyName, ScanMarker startMarker, ScanMarker endMarker, boolean hasUnsupportedCondition) throws MetaException { ExpressionTree e = null; if (l != null) { e = new ExpressionTree(); e.setRootForTest(l); } - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, keyName); + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); FilterPlan plan = planRes.plan; Assert.assertEquals("Has unsupported condition", hasUnsupportedCondition, planRes.hasUnsupportedCondition); Assert.assertEquals(1, plan.getPlans().size()); ScanPlan splan = plan.getPlans().get(0); - Assert.assertEquals(startMarker, splan.getStartMarker()); - Assert.assertEquals(endMarker, splan.getEndMarker()); + if (startMarker != null) { + Assert.assertEquals(startMarker, splan.markers.get(keyName).startMarker); + } else { + Assert.assertTrue(splan.markers.get(keyName)==null || + splan.markers.get(keyName).startMarker==null); + } + if (endMarker != null) { + Assert.assertEquals(endMarker, splan.markers.get(keyName).endMarker); + } else { + Assert.assertTrue(splan.markers.get(keyName)==null || + splan.markers.get(keyName).endMarker==null); + } } /** @@ -302,12 +299,13 @@ public class TestHBaseFilterPlanUtil { final String KEY = "k1"; final String VAL1 = "10"; final String VAL2 = "11"; - final byte[] VAL1_BYTES = PartitionFilterGenerator.toBytes(VAL1); - final byte[] VAL2_BYTES = PartitionFilterGenerator.toBytes(VAL2); LeafNode l = new LeafNode(); l.keyName = KEY; l.value = VAL1; - final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false); + final ScanMarker DEFAULT_SCANMARKER = null; + + List parts = new ArrayList(); + parts.add(new FieldSchema("k1", "int", null)); LeafNode r = new LeafNode(); r.keyName = KEY; @@ -318,19 +316,19 @@ public class TestHBaseFilterPlanUtil { // verify plan for - k1 >= '10' and k1 < '11' l.operator = Operator.GREATERTHANOREQUALTO; r.operator = Operator.LESSTHAN; - verifyPlan(tn, KEY, new ScanMarker(VAL1_BYTES, INCLUSIVE), new ScanMarker(VAL2_BYTES, - !INCLUSIVE)); + verifyPlan(tn, parts, KEY, new ScanMarker(VAL1, INCLUSIVE, "int"), new ScanMarker(VAL2, + !INCLUSIVE, "int")); // verify plan for - k1 >= '10' and k1 > '11' l.operator = Operator.GREATERTHANOREQUALTO; r.operator = Operator.GREATERTHAN; - verifyPlan(tn, KEY, new ScanMarker(VAL2_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(tn, parts, KEY, new ScanMarker(VAL2, !INCLUSIVE, "int"), DEFAULT_SCANMARKER); // verify plan for - k1 >= '10' or k1 > '11' tn = new TreeNode(l, LogicalOperator.OR, r); ExpressionTree e = new ExpressionTree(); e.setRootForTest(tn); - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(2, planRes.plan.getPlans().size()); Assert.assertEquals(false, planRes.hasUnsupportedCondition); @@ -338,7 +336,7 @@ public class TestHBaseFilterPlanUtil { TreeNode tn2 = new TreeNode(l, LogicalOperator.AND, tn); e = new ExpressionTree(); e.setRootForTest(tn2); - planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(2, planRes.plan.getPlans().size()); Assert.assertEquals(false, planRes.hasUnsupportedCondition); @@ -351,11 +349,135 @@ public class TestHBaseFilterPlanUtil { TreeNode tn3 = new TreeNode(tn2, LogicalOperator.OR, klike); e = new ExpressionTree(); e.setRootForTest(tn3); - planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(3, planRes.plan.getPlans().size()); - Assert.assertEquals(true, planRes.hasUnsupportedCondition); + Assert.assertEquals(false, planRes.hasUnsupportedCondition); + + } + @Test + public void testPartitionKeyScannerAllString() throws Exception { + List parts = new ArrayList(); + parts.add(new FieldSchema("year", "string", null)); + parts.add(new FieldSchema("month", "string", null)); + parts.add(new FieldSchema("state", "string", null)); + + // One prefix key and one minor key range + ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree; + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + ScanPlan sp = planRes.plan.getPlans().get(0); + byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + RowFilter filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key year, rowfilter contains minor key state + Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes())); + Assert.assertFalse(Bytes.contains(startRowSuffix, "CA".getBytes())); + Assert.assertFalse(Bytes.contains(endRowSuffix, "CA".getBytes())); + + PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "state"); + + // Two prefix key and one LIKE operator + exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and month > 10 " + + "and month <= 11 and state like 'C%'").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key value year/month, rowfilter contains LIKE operator + Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "11".getBytes())); + + comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ops.size(), 1); + Assert.assertEquals(comparator.ops.get(0).keyName, "state"); + + // One prefix key, one minor key range and one LIKE operator + exprTree = PartFilterExprUtil.getFilterParser("year >= 2014 and month > 10 " + + "and month <= 11 and state like 'C%'").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key value year (low bound), rowfilter contains minor key state + // and LIKE operator + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + + comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "month"); + Assert.assertEquals(comparator.ops.size(), 1); + Assert.assertEquals(comparator.ops.get(0).keyName, "state"); + + // Condition contains or + exprTree = PartFilterExprUtil.getFilterParser("year = 2014 and (month > 10 " + + "or month < 3)").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // The first ScanPlan contains year = 2014 and month > 10 + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes())); + + sp = planRes.plan.getPlans().get(1); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // The first ScanPlan contains year = 2014 and month < 3 + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "3".getBytes())); } + @Test + public void testPartitionKeyScannerMixedType() throws Exception { + List parts = new ArrayList(); + parts.add(new FieldSchema("year", "int", null)); + parts.add(new FieldSchema("month", "int", null)); + parts.add(new FieldSchema("state", "string", null)); + + // One prefix key and one minor key range + ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree; + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + ScanPlan sp = planRes.plan.getPlans().get(0); + byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + RowFilter filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key year, rowfilter contains minor key state + Assert.assertTrue(Bytes.contains(startRowSuffix, Shorts.toByteArray((short)2015))); + Assert.assertTrue(Bytes.contains(endRowSuffix, Shorts.toByteArray((short)2016))); + + PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "state"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java new file mode 100644 index 0000000..ec43ae3 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.binarysortable; + +import java.util.List; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +public class BinarySortableSerDeWithEndPrefix extends BinarySortableSerDe { + public static void serializeStruct(Output byteStream, Object[] fieldData, + List fieldOis, boolean endPrefix) throws SerDeException { + for (int i = 0; i < fieldData.length; i++) { + serialize(byteStream, fieldData[i], fieldOis.get(i), false); + } + if (endPrefix) { + if (fieldData[fieldData.length-1]!=null) { + byteStream.getData()[byteStream.getLength()-1]++; + } else { + byteStream.getData()[byteStream.getLength()-1]+=2; + } + } + } +} \ No newline at end of file