Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 113FC10AB8 for ; Thu, 28 Nov 2013 18:14:35 +0000 (UTC) Received: (qmail 34719 invoked by uid 500); 28 Nov 2013 18:14:07 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 34657 invoked by uid 500); 28 Nov 2013 18:14:05 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 34459 invoked by uid 99); 28 Nov 2013 18:13:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Nov 2013 18:13:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Nov 2013 18:13:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BF8FC23888E4; Thu, 28 Nov 2013 18:13:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1546425 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/io/h... Date: Thu, 28 Nov 2013 18:13:20 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131128181320.BF8FC23888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Thu Nov 28 18:13:19 2013 New Revision: 1546425 URL: http://svn.apache.org/r1546425 Log: [HBASE-9815] Add Histogram representative of row key distribution inside a region. Author: manukranthk Summary: Using Histogram of row key distribution inside a region, we can perform cost estimation of various scan operations and pro-actively optimize the parallelism of the scan operations. Test Plan: Unit Tests Reviewers: rshroff, aaiyer, liyintang Reviewed By: liyintang CC: hbase-eng@, san, liyintang, adela, gauravm Differential Revision: https://phabricator.fb.com/D1004829 Task ID: 2905536 Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/NumericHistogram.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHFileHistogramE2E.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHiveBasedNumericHistogram.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestUniformSplitHistogram.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Nov 28 18:13:19 2013 @@ -900,6 +900,8 @@ public final class HConstants { public static final String CLIENT_SIDE_SCAN = "hbase.client.side.scan"; public static final boolean DEFAULT_CLIENT_SIDE_SCAN = false; + public static final String USE_HFILEHISTOGRAM = "hbase.client.hfilehistogram.enabled"; + public static final boolean DEFAULT_USE_HFILEHISTOGRAM = true; private HConstants() { // Can't be instantiated with this constructor. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Nov 28 18:13:19 2013 @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.UnknownSc import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; import org.apache.hadoop.hbase.ipc.ProfilingData; import org.apache.hadoop.hbase.util.Bytes; @@ -1490,4 +1491,53 @@ public class HTable implements HTableInt public void endBatchedLoad() throws IOException { connection.endBatchedLoad(tableName, this.options); } + + /** + * Returns the List of buckets which represent the histogram for the region + * the row belongs to. + * Some notes regarding the buckets : + * The Bucket boundaries may not align with the boundaries of the Region. + * The Bucket Boundaries will look as follows : + * [0x00,0x00, ... 0x00] -> [some byte array] -> ... -> [some byte array] + * -> [0xff, 0xff, ... 0xff] + * + * @param row + * @return will be either null or at least will contain + * one element + * @throws IOException + */ + public List getHistogram(final byte[] row) throws IOException { + return this.getConnectionAndResetOperationContext() + .getRegionServerWithRetries( + new ServerCallable>(connection, + tableName, row, this.options) { + public List call() throws IOException { + return server.getHistogram( + location.getRegionInfo().getRegionName()); + } + } + ); + } + + /** + * Returns the List of buckets which represent the histogram for the column + * family in the region the row belongs to. + * Also see {@link #getHistogram(byte[])} + * @param row + * @return + * @throws IOException + */ + public List getHistogramForColumnFamily(final byte[] row, + final byte[] cf) throws IOException { + return this.getConnectionAndResetOperationContext() + .getRegionServerWithRetries( + new ServerCallable>(connection, + tableName, row, this.options) { + public List call() throws IOException { + return server.getHistogramForStore( + location.getRegionInfo().getRegionName(), cf); + } + } + ); + } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Thu Nov 28 18:13:19 2013 @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Thu Nov 28 18:13:19 2013 @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.filter.Sk import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram; import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; import org.apache.hadoop.hbase.ipc.ProfilingData; import org.apache.hadoop.hbase.master.AssignmentPlan; @@ -216,6 +217,7 @@ public class HbaseObjectWritable impleme addToMap(MultiAction.class, code++); addToMap(MultiResponse.class, code++); + addToMap(HFileHistogram.Bucket.class, code++); } private Class declaredClass; Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Nov 28 18:13:19 2013 @@ -27,7 +27,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -392,6 +391,8 @@ public class HFile { /** The configuration key for HFile version to use for new files */ public static final String FORMAT_VERSION_KEY = "hfile.format.version"; + public static final String HFILEHISTOGRAM_METABLOCK = "hfile.histogram.metaentry"; + public static int getFormatVersion(Configuration conf) { int version = conf.getInt(FORMAT_VERSION_KEY, MAX_FORMAT_VERSION); checkFormatVersion(version); Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HFileHistogram.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,249 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * Captures histogram of statistics about the distribution of rows in the HFile. + * This needs to be serialized onto the HFile. + */ +public interface HFileHistogram { + /** + * This enum provides the set of additional stats the Histogram can store. + * (TODO) manukranthk : Integrate HFileStats. + */ + public static enum HFileStat { + KEYVALUECOUNT + } + + public final String HFILEHISTOGRAM_BINCOUNT = "hfile.histogram.bin.count"; + public final int DEFAULT_HFILEHISTOGRAM_BINCOUNT = 100; + + public static class Bucket implements Writable { + private byte[] startRow; + private byte[] endRow; + private double numRows; + private Map hfileStats; + + private Bucket(byte[] startRow, byte[] endRow, double numRows, + Map hfileStats) { + this.startRow = startRow; + this.endRow = endRow; + this.numRows = numRows; + this.hfileStats = hfileStats; + } + + public Bucket() { + } + + public double getCount() { + return numRows; + } + + /** + * Returns the number of key values that this bucket holds. + * + * @return + */ + public double getNumKvs() { + return this.hfileStats.get(HFileStat.KEYVALUECOUNT); + } + + /** + * @return returns a copy of the endRow + */ + public byte[] getEndRow() { + return Bytes.copyOfByteArray(this.endRow); + } + + /** + * @return returns a copy of last + */ + public byte[] getStartRow() { + return Bytes.copyOfByteArray(this.startRow); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.startRow = Bytes.readByteArray(in); + this.endRow = Bytes.readByteArray(in); + this.numRows = in.readDouble(); + int numStats = in.readInt(); + this.hfileStats = new TreeMap(); + for (int i = 0; i < numStats; i++) { + String ordinal = Bytes.toString(Bytes.readByteArray(in)); + double val = in.readDouble(); + hfileStats.put(HFileStat.valueOf(ordinal), val); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, this.startRow); + Bytes.writeByteArray(out, this.endRow); + out.writeDouble(numRows); + out.writeInt(this.hfileStats.size()); + for (Entry entry : hfileStats.entrySet()) { + Bytes.writeByteArray(out, Bytes.toBytes(entry.getKey().name())); + out.writeDouble(entry.getValue()); + } + } + + public String print() { + StringBuilder sb = new StringBuilder(3 * this.startRow.length); + sb.append("Bucket : "); + sb.append(" , startRow : "); + sb.append(Bytes.toStringBinary(this.startRow)); + sb.append(" , endRow : "); + sb.append(Bytes.toStringBinary(this.endRow)); + sb.append(" , count : "); + sb.append(this.numRows); + return sb.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(endRow); + long temp; + temp = Double.doubleToLongBits(numRows); + result = prime * result + (int) (temp ^ (temp >>> 32)); + result = prime * result + Arrays.hashCode(startRow); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Bucket other = (Bucket) obj; + if (!Arrays.equals(endRow, other.endRow)) + return false; + if (Double.doubleToLongBits(numRows) != Double + .doubleToLongBits(other.numRows)) + return false; + if (!Arrays.equals(startRow, other.startRow)) + return false; + return true; + } + + public static class Builder { + private byte[] startRow; + private byte[] endRow; + private double numRows; + private Map hfileStats; + + public Builder() { + this.hfileStats = new HashMap(); + } + + public Builder setStartRow(byte[] startRow) { + this.startRow = Bytes.copyOfByteArray(startRow); + return this; + } + + public Builder setEndRow(byte[] endRow) { + this.endRow = Bytes.copyOfByteArray(endRow); + return this; + } + + public Builder setNumRows(double numRows) { + this.numRows = numRows; + return this; + } + + public Builder addHFileStat(HFileStat stat, Double count) { + this.hfileStats.put(stat, count); + return this; + } + + public Bucket create() { + return new Bucket(this.startRow, this.endRow, this.numRows, + this.hfileStats); + } + } + } + + /** + * Adds a row to the Histogram. + * + * @param kv + */ + public void add(KeyValue kv); + + /** + * Gets the set of Buckets from the Histogram. The buckets will be a + * representation of the Equi-Depth histogram stored inside the Region. + * + * @return + */ + public List getUniformBuckets(); + + /** + * Serializes the Histogram to he written onto the HFile and stored in the + * meta block. + * + * @return + */ + public Writable serialize(); + + /** + * Composes a list of HFileHistograms and returns a HFileHistogram which is a + * merge of all the given Histograms. Assumes that the HFileHistogram objects + * in the list are of the same type as this object. + * + * @param histograms + * @return + */ + public HFileHistogram compose(List histograms); + + /** + * Method to deserialize the histogram from the HFile. This is the inverse of + * the serialize function. + * + * @param buf + * @return + * @throws IOException + */ + public HFileHistogram deserialize(ByteBuffer buf) throws IOException; + + public HFileHistogram merge(HFileHistogram h2); + + public int getBinCount(); +} Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/HiveBasedNumericHistogram.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,465 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Adapted from NumericHistogram from hive + * Reference : hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/ + * generic/NumericHistogram.java + * + */ + +/** + * A generic, re-usable histogram class that supports partial aggregations. The + * algorithm is a heuristic adapted from the following paper: Yael Ben-Haim and + * Elad Tom-Tov, "A streaming parallel decision tree algorithm", J. Machine + * Learning Research 11 (2010), pp. 849--872. Although there are no + * approximation guarantees, it appears to work well with adequate data and a + * large (e.g., 20-80) number of histogram bins. + */ +public class HiveBasedNumericHistogram implements NumericHistogram { + /** + * The Coord class defines a histogram bin, which is just an (x,y) pair. + */ + protected static class Coord implements Comparable { + double x; + double y; + Map, Double> stats; + + public int compareTo(Coord o) { + if (x < o.x) { + return -1; + } + if (x > o.x) { + return 1; + } + return 0; + } + }; + + // Class variables + private int nbins; + private int nusedbins; + private ArrayList bins; + private Random prng; + private final double minusInfinity; + private final double infinity; + + /** + * Creates a new histogram object. Note that the allocate() or merge() method + * must be called before the histogram can be used. + */ + public HiveBasedNumericHistogram(double minusInfinity, double infinity) { + nbins = 0; + nusedbins = 0; + bins = null; + this.minusInfinity = minusInfinity; + this.infinity = infinity; + + // init the RNG for breaking ties in histogram merging. A fixed seed is + // specified here + // to aid testing, but can be eliminated to use a time-based seed (which + // would + // make the algorithm non-deterministic). + prng = new Random(31183); + } + + protected HiveBasedNumericHistogram(double minusInfinity2, double infinity2, + List nbuckets) { + this.minusInfinity = minusInfinity2; + this.infinity = infinity2; + + } + + /** + * Resets a histogram object to its initial state. allocate() or merge() must + * be called again before use. + */ + public void reset() { + bins = null; + nbins = nusedbins = 0; + } + + /** + * Returns the number of bins currently being used by the histogram. + */ + public int getUsedBins() { + return nusedbins; + } + + /** + * Returns true if this histogram object has been initialized by calling + * merge() or allocate(). + */ + public boolean isReady() { + return nbins != 0; + } + + /** + * Returns a particular histogram bin. + */ + public Coord getBin(int b) { + return bins.get(b); + } + + /** + * Sets the number of histogram bins to use for approximating data. + * + * @param num_bins + * Number of non-uniform-width histogram bins to use + */ + public void allocate(int numBins) { + nbins = numBins; + bins = new ArrayList(); + nusedbins = 0; + } + + /** + * Takes a serialized histogram created by the serialize() method and merges + * it with the current histogram object. + * + * @param other + * A serialized histogram created by the serialize() method + * @see #merge + */ + public NumericHistogram merge(NumericHistogram hist) { + Preconditions.checkNotNull(hist); + Preconditions.checkArgument(hist instanceof HiveBasedNumericHistogram); + HiveBasedNumericHistogram other = (HiveBasedNumericHistogram)hist; + + if (nbins == 0 || nusedbins == 0) { + // Our aggregation buffer has nothing in it, so just copy over 'other' + // by deserializing the ArrayList of (x,y) pairs into an array of Coord + // objects + nbins = other.nbins; + nusedbins = other.nusedbins; + bins = new ArrayList(nusedbins); + for (int i = 0; i < other.nusedbins; i++) { + Coord bin = new Coord(); + bin.x = other.bins.get(i).x; + bin.y = other.bins.get(i).y; + bins.add(bin); + } + } else { + // The aggregation buffer already contains a partial histogram. Therefore, + // we need + // to merge histograms using Algorithm #2 from the Ben-Haim and Tom-Tov + // paper. + + ArrayList tmp_bins = new ArrayList(nusedbins + + other.nusedbins); + // Copy all the histogram bins from us and 'other' into an overstuffed + // histogram + for (int i = 0; i < nusedbins; i++) { + Coord bin = new Coord(); + bin.x = bins.get(i).x; + bin.y = bins.get(i).y; + tmp_bins.add(bin); + } + for (int j = 0; j < other.nusedbins; j++) { + Coord bin = new Coord(); + bin.x = other.bins.get(j).x; + bin.y = other.bins.get(j).y; + tmp_bins.add(bin); + } + Collections.sort(tmp_bins); + + // Now trim the overstuffed histogram down to the correct number of bins + bins = tmp_bins; + nusedbins += other.nusedbins; + trim(); + } + return this; + } + + /** + * Adds a new data point to the histogram approximation. Make sure you have + * called either allocate() or merge() first. This method implements Algorithm + * #1 from Ben-Haim and Tom-Tov, + * "A Streaming Parallel Decision Tree Algorithm", JMLR 2010. + * + * @param v + * The data point to add to the histogram approximation. + */ + public void add(double v) { + // Binary search to find the closest bucket that v should go into. + // 'bin' should be interpreted as the bin to shift right in order to + // accomodate + // v. As a result, bin is in the range [0,N], where N means that the value v + // is + // greater than all the N bins currently in the histogram. It is also + // possible that + // a bucket centered at 'v' already exists, so this must be checked in the + // next step. + int bin = 0; + for (int l = 0, r = nusedbins; l < r;) { + bin = (l + r) / 2; + if (bins.get(bin).x > v) { + r = bin; + } else { + if (bins.get(bin).x < v) { + l = ++bin; + } else { + break; // break loop on equal comparator + } + } + } + + // If we found an exact bin match for value v, then just increment that + // bin's count. + // Otherwise, we need to insert a new bin and trim the resulting histogram + // back to size. + // A possible optimization here might be to set some threshold under which + // 'v' is just + // assumed to be equal to the closest bin -- if fabs(v-bins[bin].x) < + // THRESHOLD, then + // just increment 'bin'. This is not done now because we don't want to make + // any + // assumptions about the range of numeric data being analyzed. + if (bin < nusedbins && bins.get(bin).x == v) { + bins.get(bin).y++; + } else { + Coord newBin = new Coord(); + newBin.x = v; + newBin.y = 1; + bins.add(bin, newBin); + + // Trim the bins down to the correct number of bins. + if (++nusedbins > nbins) { + trim(); + } + } + + } + + /** + * Trims a histogram down to 'nbins' bins by iteratively merging the closest + * bins. If two pairs of bins are equally close to each other, decide + * uniformly at random which pair to merge, based on a PRNG. + */ + private void trim() { + while (nusedbins > nbins) { + // Find the closest pair of bins in terms of x coordinates. + // Break ties randomly. + double smallestdiff = bins.get(1).x - bins.get(0).x; + int smallestdiffloc = 0, smallestdiffcount = 1; + for (int i = 1; i < nusedbins - 1; i++) { + double diff = bins.get(i + 1).x - bins.get(i).x; + if (diff < smallestdiff) { + smallestdiff = diff; + smallestdiffloc = i; + smallestdiffcount = 1; + } else { + if (diff == smallestdiff + && prng.nextDouble() <= (1.0 / ++smallestdiffcount)) { + smallestdiffloc = i; + } + } + } + + // Merge the two closest bins into their average x location, weighted by + // their heights. + // The height of the new bin is the sum of the heights of the old bins. + // double d = bins[smallestdiffloc].y + bins[smallestdiffloc+1].y; + // bins[smallestdiffloc].x *= bins[smallestdiffloc].y / d; + // bins[smallestdiffloc].x += bins[smallestdiffloc+1].x / d * + // bins[smallestdiffloc+1].y; + // bins[smallestdiffloc].y = d; + + double d = bins.get(smallestdiffloc).y + bins.get(smallestdiffloc + 1).y; + Coord smallestdiffbin = bins.get(smallestdiffloc); + smallestdiffbin.x *= smallestdiffbin.y / d; + smallestdiffbin.x += bins.get(smallestdiffloc + 1).x / d + * bins.get(smallestdiffloc + 1).y; + smallestdiffbin.y = d; + // Shift the remaining bins left one position + bins.remove(smallestdiffloc + 1); + nusedbins--; + } + } + + /** + * Gets an approximate quantile value from the current histogram. Some popular + * quantiles are 0.5 (median), 0.95, and 0.98. + * + * @param q + * The requested quantile, must be strictly within the range (0,1). + * @return The quantile value. + */ + public double quantile(double q) { + assert (bins != null && nusedbins > 0 && nbins > 0); + double sum = 0, csum = 0; + int b; + for (b = 0; b < nusedbins; b++) { + sum += bins.get(b).y; + } + for (b = 0; b < nusedbins; b++) { + csum += bins.get(b).y; + if (csum / sum >= q) { + if (b == 0) { + return bins.get(b).x; + } + + csum -= bins.get(b).y; + double r = bins.get(b - 1).x + (q * sum - csum) + * (bins.get(b).x - bins.get(b - 1).x) / (bins.get(b).y); + return r; + } + } + return -1; // for Xlint, code will never reach here + } + + public int getNumBins() { + return bins == null ? 0 : bins.size(); + } + + /** + * Gives the sum for points from [-infinity, pi] + * + * @param i + * @return + */ + public double sum(int i) { + double sum = 0; + for (int j = i - 1; j >= 0; j--) { + sum += this.bins.get(j).y; + } + return sum + (bins.get(i).y / 2); + } + + /** + * Following the Algorithm 4 : Uniform Procedure in the paper. Returns a + * HiveBasedNumericHistogram which has B - 1 points such that each range + * [-infinity, p1], [p1, p2] .. [pB, infinity] have the same number of + * elements. + * + * @return + */ + public HiveBasedNumericHistogram uniform(int B) { + double sum = 0.0; + HiveBasedNumericHistogram hist = new HiveBasedNumericHistogram( + this.minusInfinity, this.infinity); + for (int j = 0; j < this.nusedbins; j++) { + sum += bins.get(j).y; + } + if (this.nusedbins == 0) + return hist; + double[] partialSums = new double[this.nusedbins]; + partialSums[0] = this.bins.get(0).y / 2; + for (int j = 1; j < this.nusedbins; j++) { + partialSums[j] = partialSums[j - 1] + + (this.bins.get(j - 1).y + this.bins.get(j).y) / 2; + } + hist.allocate(this.nusedbins); + hist.bins = new ArrayList(); + for (int j = 0; j < (B - 1); j++) { + double s = sum * (j + 1) / B; + int i = 0; + double d = s - partialSums[this.nusedbins - 1]; + for (; i < (this.nusedbins - 1); i++) { + if (s >= partialSums[i] && s < partialSums[i + 1]) { + d = s - partialSums[i]; + break; + } + } + double endVal = 0; + double endKey = this.infinity; + if (i < (this.nusedbins - 1)) { + endVal = this.bins.get(i + 1).y; + endKey = this.bins.get(i + 1).x; + } + double a = endVal - this.bins.get(i).y; + a = (a == 0) ? 1 : a; + double b = 2 * this.bins.get(i).y; + double c = -2 * d; + double det = b * b - 2 * a * c; + double z = (-b + Math.sqrt(det)) / (2 * a); + Coord newBin = new Coord(); + newBin.x = this.bins.get(i).x + (endKey - this.bins.get(i).x) * z; + newBin.y = sum / this.nusedbins; + hist.bins.add(newBin); + } + hist.nusedbins = hist.bins.size(); + hist.nbins = hist.nusedbins; + return hist; + } + + private List getBuckets(HiveBasedNumericHistogram hist) { + List buckets = Lists.newArrayList(); + if (hist.bins.size() == 0) { + return buckets; + } + buckets.add(new Bucket(this.minusInfinity, hist.bins.get(0).x, hist.bins + .get(0).y / 2, hist.bins.get(0).stats)); + if (hist.bins.size() != 1) { + for (int i = 1; i < hist.bins.size(); i++) { + buckets.add(new Bucket(hist.bins.get(i - 1).x, hist.bins.get(i).x, + (hist.bins.get(i - 1).y + hist.bins.get(i).y) / 2, + hist.bins.get(i).stats)); + } + } + buckets.add(new Bucket(hist.bins.get(hist.bins.size() - 1).x, + this.infinity, hist.bins.get(hist.bins.size() - 1).y / 2, hist.bins + .get(hist.bins.size() - 1).stats)); + return buckets; + } + + public static HiveBasedNumericHistogram getHistogram(List buckets) { + if (buckets.size() <= 1) + return null; + double min = buckets.get(0).getStart(); + double max = buckets.get(buckets.size() - 1).getEnd(); + HiveBasedNumericHistogram ret = new HiveBasedNumericHistogram(min, max); + ret.allocate(buckets.size() - 1); + for (int i = 1; i < buckets.size(); i++) { + Coord c = new Coord(); + Bucket b = buckets.get(i); + Bucket prevB = buckets.get(i - 1); + c.x = b.getStart(); + c.y = (prevB.getCount() + b.getCount()) / 2; + ret.bins.add(c); + } + ret.nusedbins = ret.bins.size(); + return ret; + } + + /** + * Constructs a uniform histogram and returns the list of buckets. + */ + public List getUniformBuckets() { + return getBuckets(uniform(this.getUsedBins())); + } + + public List getOriginalBuckets() { + return getBuckets(this); + } + + @Override + public int getBinCount() { + return this.nbins; + } +} Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/NumericHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/NumericHistogram.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/NumericHistogram.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/NumericHistogram.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,129 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.HFileStat; + +public interface NumericHistogram { + public static class Bucket { + private double count; + private double start; + private double end; + private Map, Double> stats; + + public Bucket(double start, double end, double count, + Map, Double> stats) { + this.count = count; + this.start = start; + this.end = end; + this.stats = stats; + } + + public double getCount() { + return count; + } + + public double getStart() { + return start; + } + + public double getEnd() { + return end; + } + + public double getStat(HFileStat stat) { + return this.stats.get(stat); + } + + public static class Builder { + private double startRow; + private double endRow; + private double numRows; + private Map, Double> hfileStats; + + public Builder() { + this.hfileStats = new HashMap, Double>(); + } + + public Builder setStartRow(double startRow) { + this.startRow = startRow; + return this; + } + + public Builder setEndRow(double endRow) { + this.endRow = endRow; + return this; + } + + public Builder setNumRows(double numRows) { + this.numRows = numRows; + return this; + } + + public Builder addHFileStat(HFileStat stat, Double count) { + this.hfileStats.put(stat, count); + return this; + } + + public Bucket create() { + return new Bucket(this.startRow, this.endRow, this.numRows, + this.hfileStats); + } + } + + public String print() { + StringBuilder sb = new StringBuilder(); + sb.append("Bucket : "); + sb.append(" start: "); + sb.append(this.start); + sb.append(" end: "); + sb.append(this.end); + sb.append(" count: "); + sb.append(this.count); + return sb.toString(); + } + } + + /** + * Adds a double value into the histogram. + * @param dataPoint + */ + public void add(double dataPoint); + + /** + * Returns the list of buckets which represent the equi-depth histogram. + * @return + */ + public List getUniformBuckets(); + + /** + * Clears the state and allocates the histogram. + * @param num_bins + */ + public void allocate(int num_bins); + + public int getBinCount(); + + public NumericHistogram merge(NumericHistogram hist); +} Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/histogram/UniformSplitHFileHistogram.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,252 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Uniform split histogram splits the range uniformly while creating the + * Histogram i.e. if we split range [a00, z00] into 25 parts, we should get + * [a00, b00], [b00, c00], ... , [y00, z00] + * + * This layer is provided to be able to easily swap other underlyingHistogram + * implementations in the future. + */ +public class UniformSplitHFileHistogram implements HFileHistogram { + protected NumericHistogram underlyingHistogram; + // TODO manukranthk : make this configurable. + int padding = 8; + + public UniformSplitHFileHistogram(int binCount) { + this.underlyingHistogram = new HiveBasedNumericHistogram( + getMinusInfinity(), getInfinity()); + this.underlyingHistogram.allocate(binCount); + } + + private UniformSplitHFileHistogram(List buckets) { + List nbuckets = Lists.newArrayList(); + for (Bucket b : buckets) { + nbuckets.add(this.getFromHFileHistogramBucket(b)); + } + this.underlyingHistogram = HiveBasedNumericHistogram.getHistogram(nbuckets); + } + + @Override + public void add(KeyValue kv) { + double val = convertBytesToDouble(kv.getRow()); + underlyingHistogram.add(val); + } + + private double getInfinity() { + return new BigInteger(getInfinityArr()).doubleValue(); + } + + /** + * This returns the maximum number that we can represent using padding bytes. + * Returns {0x00, 0xff, 0xff .... 0xff } + * <---- padding ----> + * @return + */ + private byte[] getInfinityArr() { + byte[] row = new byte[1]; + row[0] = (byte) 0; + return Bytes.appendToTail(row, padding, (byte)0xFF); + } + + private double getMinusInfinity() { + return 0.0; + } + + /** + * Bytes are are sorted lexicographically, so for the purposes of + * HFileHistogram, we need to convert a byte[] to a double so that it still + * compares correctly. + * + * We initially take the first 'padding' amount of bytes and convert the bytes + * into a BigInteger assuming the byte[] was in 2's complement representation + * + * We will add an extra 0 at the start so that we don't have to deal with -ve + * numbers. + * + * @param row + * @return + */ + protected double convertBytesToDouble(byte[] row) { + byte[] tmpRow = Bytes.head(row, Math.min(row.length, padding)); + byte[] newRow = Bytes.padTail(tmpRow, padding - tmpRow.length); + // To avoid messing with 2's complement. + newRow = Bytes.padHead(newRow, 1); + return new BigInteger(newRow).doubleValue(); + } + + /** + * Double is converted to Bytes in a similar manner. + * + * @param d + * @return + */ + protected byte[] convertDoubleToBytes(double d) { + BigDecimal tmpDecimal = new BigDecimal(d); + BigInteger tmp = tmpDecimal.toBigInteger(); + byte[] arr = tmp.toByteArray(); + if (arr[0] == 0) { + // to represent {0xff, 0xff}, big integer uses {0x00, 0xff, 0xff} + // due to the one's compliment representation. + Preconditions.checkArgument(arr.length == 1 || arr[1] != 0); + arr = Bytes.tail(arr, arr.length - 1); + } + if (arr.length > padding) { + // Can happen due to loose precision guarentee in double. + // while doing the conversion, + // {0x00, 0xff, ... , 0xff, 0xff}=>double=>{0x01, 0x00, ... , 0x00, 0x00} + // might happen. + arr = Bytes.tail(getInfinityArr(), padding); + } + return Bytes.padHead(arr, padding - arr.length); + } + + @Override + public List getUniformBuckets() { + List buckets = this.underlyingHistogram + .getUniformBuckets(); + List ret = Lists.newArrayList(); + for (NumericHistogram.Bucket b : buckets) { + ret.add(getFromNumericHistogramBucket(b)); + } + return ret; + } + + public static HFileHistogram getHistogram(List buckets) { + HFileHistogram ret = new UniformSplitHFileHistogram(buckets); + return ret; + } + + private NumericHistogram.Bucket getFromHFileHistogramBucket( + HFileHistogram.Bucket bucket) { + NumericHistogram.Bucket b = (new NumericHistogram.Bucket.Builder()) + .setStartRow(convertBytesToDouble(bucket.getStartRow())) + .setEndRow(convertBytesToDouble(bucket.getEndRow())) + .setNumRows(bucket.getCount()).create(); + return b; + } + + private HFileHistogram.Bucket getFromNumericHistogramBucket( + NumericHistogram.Bucket bucket) { + Bucket b = (new Bucket.Builder()) + .setStartRow(this.convertDoubleToBytes(bucket.getStart())) + .setEndRow(this.convertDoubleToBytes(bucket.getEnd())) + .setNumRows(bucket.getCount()).create(); + return b; + } + + @Override + public Writable serialize() { + return new Writable() { + List buckets; + + public Writable setVal(List buckets) { + this.buckets = buckets; + return this; + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + buckets = new ArrayList(len); + for (int i = 0; i < len; i++) { + Bucket b = new Bucket(); + b.readFields(in); + buckets.add(b); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(buckets.size()); + for (Bucket bucket : buckets) { + bucket.write(out); + } + } + }.setVal(getUniformBuckets()); + } + + /** + * Modifies the elements in the list of histograms. + */ + @Override + public HFileHistogram compose(List histograms) { + if (histograms.size() <= 0) + return null; + HFileHistogram h = histograms.get(0); + int binCnt = h.getBinCount(); + HFileHistogram ret = new UniformSplitHFileHistogram(binCnt); + for (HFileHistogram h2 : histograms) { + ret = ret.merge(h2); + } + return ret; + } + + @Override + public HFileHistogram deserialize(ByteBuffer buf) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(), + buf.arrayOffset(), buf.limit()); + DataInput in = new DataInputStream(bais); + int len = in.readInt(); + List buckets = new ArrayList(len); + for (int i = 0; i < len; i++) { + Bucket b = new Bucket(); + b.readFields(in); + buckets.add(b); + } + bais.close(); + if (buckets.size() == 0) return null; + HFileHistogram ret = getHistogram(buckets); + return ret; + } + + @Override + public HFileHistogram merge(HFileHistogram h2) { + Preconditions.checkNotNull(h2); + Preconditions.checkArgument(h2 instanceof UniformSplitHFileHistogram); + UniformSplitHFileHistogram h = (UniformSplitHFileHistogram) h2; + this.underlyingHistogram.merge(h.underlyingHistogram); + return this; + } + + @Override + public int getBinCount() { + return this.underlyingHistogram.getBinCount(); + } +} Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Nov 28 18:13:19 2013 @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; import org.apache.hadoop.hbase.master.AssignmentPlan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.io.MapWritable; @@ -435,4 +436,24 @@ public interface HRegionInterface extend * */ public void setHDFSQuorumReadTimeoutMillis(long timeoutMillis); + + /** + * Returns the list of buckets which represent the uniform depth histogram + * for a given region. + * @param regionName + * @return + * @throws IOException + */ + public List getHistogram(byte[] regionName) throws IOException; + + /** + * Returns the list of buckets which represent the uniform depth histogram + * for a given store. + * @param regionName + * @param family + * @return + * @throws IOException + */ + public List getHistogramForStore(byte[] regionName, byte[] family) + throws IOException; } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Nov 28 18:13:19 2013 @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.io.Refere import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.L2Cache; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -4033,6 +4034,15 @@ public class HRegion implements HeapSize } }; + public HFileHistogram getHistogram() throws IOException { + List histograms = new ArrayList(); + if (stores.size() == 0) return null; + for (Store s : stores.values()) { + histograms.add(s.getHistogram()); + } + HFileHistogram h = histograms.get(0).compose(histograms); + return h; + } /** * Facility for dumping and compacting catalog tables. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Nov 28 18:13:19 2013 @@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCOptions; @@ -3814,6 +3815,21 @@ public class HRegionServer implements HR } return 0; } + + @Override + public List getHistogram(byte[] regionName) throws IOException { + checkOpen(); + HRegion region = getRegion(regionName); + return region.getHistogram().getUniformBuckets(); + } + + @Override + public List getHistogramForStore(byte[] regionName, byte[] family) + throws IOException { + checkOpen(); + HRegion region = getRegion(regionName); + return region.getStore(family).getHistogram().getUniformBuckets(); + } } boolean origProfiling = enableServerSideProfilingForAllCalls.get(); boolean newProfiling = conf.getBoolean( Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 28 18:13:19 2013 @@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram; +import org.apache.hadoop.hbase.io.hfile.histogram.UniformSplitHFileHistogram; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.compactionhook.CompactionHook; @@ -162,6 +164,7 @@ public class Store extends SchemaConfigu private CompactionHook compactHook = null; private final HRegionInfo info; + private boolean writeHFileHistogram = false; // This should account for the Store's non static variables. So, when there // is an addition to the member variables to Store, this value should be @@ -274,6 +277,9 @@ public class Store extends SchemaConfigu Store.closeCheckInterval = conf.getInt( "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); } + + writeHFileHistogram = conf.getBoolean(HConstants.USE_HFILEHISTOGRAM, + HConstants.DEFAULT_USE_HFILEHISTOGRAM); } /** * Constructor @@ -762,7 +768,9 @@ public class Store extends SchemaConfigu this.region.getSmallestReadPoint(), Long.MIN_VALUE, getAggregator(), flashBackQueryLimit); // include all deletes - + HFileHistogram hist = new UniformSplitHFileHistogram( + this.conf.getInt(HFileHistogram.HFILEHISTOGRAM_BINCOUNT, + HFileHistogram.DEFAULT_HFILEHISTOGRAM_BINCOUNT)); String fileName; try { // TODO: We can fail in the below block before we complete adding this @@ -776,12 +784,20 @@ public class Store extends SchemaConfigu writer.setTimeRangeTracker(snapshotTimeRangeTracker); fileName = writer.getPath().getName(); try { + byte[] lastRow = new byte[0]; final List kvs = new ArrayList(); boolean hasMore; do { hasMore = scanner.next(kvs); if (!kvs.isEmpty()) { for (KeyValue kv : kvs) { + if (writeHFileHistogram) { + byte[] thisRow = kv.getRow(); + if (!Bytes.equals(lastRow, thisRow)) { + hist.add(kv); + } + lastRow = thisRow; + } // If we know that this KV is going to be included always, then let us // set its memstoreTS to 0. This will help us save space when writing to disk. if (kv.getMemstoreTS() <= smallestReadPoint) { @@ -802,6 +818,9 @@ public class Store extends SchemaConfigu // hfile. The hfile is current up to and including logCacheFlushId. status.setStatus("Flushing " + this + ": appending metadata"); writer.appendMetadata(EnvironmentEdgeManager.currentTimeMillis(), logCacheFlushId, false); + if (writeHFileHistogram) { + writer.appendHFileHistogram(hist); + } status.setStatus("Flushing " + this + ": closing flushed file"); writer.close(); InjectionHandler.processEventIO(InjectionEvent.STOREFILE_AFTER_WRITE_CLOSE, writer.getPath()); @@ -1331,6 +1350,9 @@ public class Store extends SchemaConfigu // Find the smallest read point across all the Scanners. long smallestReadPoint = region.getSmallestReadPoint(); MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); + HFileHistogram hist = new UniformSplitHFileHistogram( + this.conf.getInt(HFileHistogram.HFILEHISTOGRAM_BINCOUNT, + HFileHistogram.DEFAULT_HFILEHISTOGRAM_BINCOUNT)); try { InternalScanner scanner = null; try { @@ -1356,6 +1378,7 @@ public class Store extends SchemaConfigu } KeyValueContext kvContext = new KeyValueContext(); + byte[] lastRow = new byte[0]; do { hasMore = scanner.next(kvs, 1, kvContext); if (!kvs.isEmpty()) { @@ -1364,6 +1387,13 @@ public class Store extends SchemaConfigu } // output to writer: for (KeyValue kv : kvs) { + if (writeHFileHistogram) { + byte[] thisRow = kv.getRow(); + if (!Bytes.equals(lastRow, thisRow)) { + hist.add(kv); + } + lastRow = thisRow; + } if (kv.getMemstoreTS() <= smallestReadPoint) { kv.setMemstoreTS(0); } @@ -1411,12 +1441,29 @@ public class Store extends SchemaConfigu minFlushTime = HConstants.NO_MIN_FLUSH_TIME; } writer.appendMetadata(minFlushTime, maxCompactingSequcenceId, majorCompaction); + if (writeHFileHistogram) { + writer.appendHFileHistogram(hist); + } writer.close(); } } return writer; } + private HFileHistogram hist = null; + public HFileHistogram getHistogram() throws IOException { + if (hist != null) return hist; + List histograms = new ArrayList(); + if (storefiles.size() == 0) return null; + for (StoreFile file : this.storefiles) { + HFileHistogram hist = file.getHistogram(); + if (hist != null) histograms.add(hist); + } + HFileHistogram h = histograms.get(0).compose(histograms); + this.hist = h; + return hist; + } + /** * Validates a store file by opening and closing it. In HFileV2 this should * not be an expensive operation. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Thu Nov 28 18:13:19 2013 @@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.io.hfile.HFileWriterV1; import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram; +import org.apache.hadoop.hbase.io.hfile.histogram.UniformSplitHFileHistogram; import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.BloomFilter; @@ -216,6 +218,8 @@ public class StoreFile extends SchemaCon // the last modification time stamp private long modificationTimeStamp = 0L; + private HFileHistogram histogram = null; + /** * Constructor, loads a reader and it's indices, etc. May allocate a * substantial amount of ram depending on the underlying files (10-20MB?). @@ -1146,6 +1150,15 @@ public class StoreFile extends SchemaCon includeInTimeRangeTracker(kv); } + /** + * Appends HFileHistogram to the HFile. This function is to be called only + * once with the Histogram that is constructed after compaction. + */ + public void appendHFileHistogram(HFileHistogram histogram) { + writer.appendMetaBlock(HFile.HFILEHISTOGRAM_METABLOCK, + histogram.serialize()); + } + public Path getPath() { return this.writer.getPath(); } @@ -1832,4 +1845,15 @@ public class StoreFile extends SchemaCon }); } + public HFileHistogram getHistogram() throws IOException { + if (histogram != null) return histogram; + ByteBuffer buf = this.reader.reader.getMetaBlock( + HFile.HFILEHISTOGRAM_METABLOCK, false); + histogram = new UniformSplitHFileHistogram( + this.conf.getInt(HFileHistogram.HFILEHISTOGRAM_BINCOUNT, + HFileHistogram.DEFAULT_HFILEHISTOGRAM_BINCOUNT)); + histogram = histogram.deserialize(buf); + return histogram; + } + } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1546425&r1=1546424&r2=1546425&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Thu Nov 28 18:13:19 2013 @@ -1102,9 +1102,22 @@ public class Bytes { * @return Value in a plus length appended 0 bytes */ public static byte [] padTail(final byte [] a, final int length) { + return appendToTail(a, length, (byte)0); + } + + /** + * Appends length bytes to the end of the array and returns the new array + * Fills byte b in the newly allocated space in the byte[]. + * @param a array + * @param length new array size + * @param b byte to write to the tail. + * @return Value in a plus length appended 0 bytes + */ + public static byte [] appendToTail(final byte [] a, final int length, byte b) + { byte [] padding = new byte[length]; for (int i = 0; i < length; i++) { - padding[i] = 0; + padding[i] = b; } return add(a,padding); } @@ -1477,4 +1490,10 @@ public class Bytes { public static boolean isNonEmpty(ByteBuffer b) { return b != null && b.remaining() > 0; } + + public static byte[] copyOfByteArray(byte[] arr) { + byte[] tmp = new byte[arr.length]; + System.arraycopy(arr, 0, tmp, 0, arr.length); + return tmp; + } } Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHFileHistogramE2E.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHFileHistogramE2E.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHFileHistogramE2E.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHFileHistogramE2E.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,141 @@ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHFileHistogramE2E { + private static final byte[] TABLE = + Bytes.toBytes("TestHFileHistogramE2ESingleStore"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + private static final byte[] TABLE2 = + Bytes.toBytes("TestHistogramSerDeE2E"); + private static final Log LOG = LogFactory.getLog(TestHFileHistogramE2E.class); + private HBaseTestingUtility util = new HBaseTestingUtility(); + private final int numBuckets = 100; + + @Before + public void setUp() throws Exception { + util.getConfiguration().setInt(HFileHistogram.HFILEHISTOGRAM_BINCOUNT, + numBuckets); + util.startMiniCluster(3); + } + + @After + public void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testSingleStore() throws IOException { + HTable table = util.createTable(TABLE, FAMILY); + util.loadTable(table, FAMILY); + util.flush(TABLE); + assertTrue(util.getHBaseCluster().getRegions(TABLE).size() == 1); + HRegion region = util.getHBaseCluster().getRegions(TABLE).get(0); + HFileHistogram hist = region.getHistogram(); + assertTrue(hist != null); + boolean first = true; + List buckets = hist.getUniformBuckets(); + assertTrue(buckets != null); + assertTrue(buckets.size() > 0); + Bucket prevBucket = buckets.get(0); + for (Bucket b : buckets) { + if (first) { + first = false; + prevBucket = b; + continue; + } + assertTrue(Bytes.compareTo(b.getStartRow(), prevBucket.getEndRow()) >= 0); + assertTrue(Bytes.compareTo(b.getEndRow(), prevBucket.getStartRow()) > 0); + } + } + + @Test + public void testHistogramSerDeE2E() throws IOException { + HTable table = util.createTable(TABLE2, FAMILY); + util.loadTable(table, FAMILY); + util.flush(TABLE2); + assertTrue(util.getHBaseCluster().getRegions(TABLE2).size() == 1); + HRegion region = util.getHBaseCluster().getRegions(TABLE2).get(0); + List buckets = region.getHistogram().getUniformBuckets(); + assertTrue(buckets != null); + assertTrue(buckets.size() > 0); + List serBuckets = table.getHistogramForColumnFamily( + region.getStartKey(), FAMILY); + assertTrue(serBuckets != null); + assertTrue(serBuckets.size() > 0); + assertTrue(compareBuckets(buckets, serBuckets)); + } + + public boolean compareBuckets(List buckets1, List buckets2) { + int len1 = buckets1.size(); + int len2 = buckets2.size(); + assertTrue(len1 == len2); + for (int i=0; i putRandomKVs(HTable table, int numEntries, int rowSize) + throws IOException { + List inputList = new ArrayList(); + // The error estimation holds for more than 10000 entries. + // We wouldn't be using this feature if it weren't bigger than that. + Random r = new Random(); + for (int i = 0; i < numEntries; i++) { + byte[] arr = new byte[rowSize]; + r.nextBytes(arr); + KeyValue kv = new KeyValue(arr, (long)0); + inputList.add(kv.getRow()); + table.put(new Put(kv.getRow()).add(FAMILY, null, kv.getRow())); + if (i%10000 == 0) { + table.flushCommits(); + util.flush(); + } + } + return inputList; + } + + @Test + public void testHistogramError() throws IOException { + byte[] TABLE3 = Bytes.toBytes("testHistogramError"); + HTable table = util.createTable(TABLE3, FAMILY); + util.flush(TABLE2); + Random r = new Random(); + int numEntries = 100000 + r.nextInt(100000); + int expectedBucketCnt = numEntries/numBuckets; + List inputList = putRandomKVs(table, numEntries, 15); + Collections.sort(inputList, Bytes.BYTES_COMPARATOR); + List regions = util.getHBaseCluster().getRegions(TABLE3); + assertTrue(regions.size() == 1); + HRegion region = regions.get(0); + List lst = table.getHistogram(region.getStartKey()); + assertTrue(lst.size() > 0); + + TestUniformSplitHistogram.checkError(inputList, lst, + 0.2, expectedBucketCnt); + } +} Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHiveBasedNumericHistogram.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHiveBasedNumericHistogram.java?rev=1546425&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHiveBasedNumericHistogram.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/histogram/TestHiveBasedNumericHistogram.java Thu Nov 28 18:13:19 2013 @@ -0,0 +1,82 @@ +package org.apache.hadoop.hbase.io.hfile.histogram; + +import static org.junit.Assert.*; + +import java.util.Random; + +import org.apache.hadoop.hbase.io.hfile.histogram.HiveBasedNumericHistogram.Coord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestHiveBasedNumericHistogram { + private HiveBasedNumericHistogram hist; + double min = 0; + double max = 10000; + int count = 1000; + int buckets = 100; + Random r; + @Before + public void setUp() throws Exception { + setup(min, max, buckets, count); + } + + @After + public void tearDown() throws Exception { + + } + + private void setup(double min, double max, int buckets, int count) { + hist = new HiveBasedNumericHistogram(min, max); + r = new Random(); + // Inserting elements into the histogram. + hist.allocate(buckets); + for (int i = 0; i lst = hist.getUniformBuckets(); + assertTrue(lst.size() > 0); + Bucket prevBucket = null; + for (Bucket b : lst) { + if (prevBucket != null) { + assertTrue(Bytes.toStringBinary(b.getStartRow()) + + " not greater than " + + Bytes.toStringBinary(prevBucket.getStartRow()), + Bytes.compareTo(b.getStartRow(), prevBucket.getStartRow()) > 0); + assertTrue(Bytes.toStringBinary(b.getEndRow()) + + " not greater than " + + Bytes.toStringBinary(prevBucket.getEndRow()), + Bytes.compareTo(b.getEndRow(), prevBucket.getEndRow()) >= 0); + assertTrue(Bytes.toStringBinary(b.getEndRow()) + + " not greater than " + + Bytes.toStringBinary(prevBucket.getStartRow()), + Bytes.compareTo(b.getEndRow(), prevBucket.getStartRow()) >= 0); + } + prevBucket = b; + } + } + + @Test + public void testUniformHistogramError() { + for (int numRuns = 0; numRuns < 100; numRuns++) { + int numBuckets = 100; + UniformSplitHFileHistogram hist = new UniformSplitHFileHistogram(numBuckets); + Random r = new Random(); + int size = 10; + List inputList = new ArrayList(); + // The error estimation holds for more than 10000 entries. + // We wouldn't be using this feature if it weren't bigger than that. + int numEntries = 10000 + r.nextInt(10000); + int expectedBucketCnt = numEntries/numBuckets; + for (int i = 0; i < numEntries; i++) { + byte[] arr = new byte[size]; + r.nextBytes(arr); + KeyValue kv = new KeyValue(arr, (long)0); + inputList.add(kv.getRow()); + hist.add(kv); + } + List lst = hist.getUniformBuckets(); + + // 20 error is an observation, this test gives an estimate of how much + // error you can expect. + checkError(inputList, lst, 0.2, expectedBucketCnt); + } + } + + public static void checkError(List inputList, List lst, + double errorPct, int expectedBucketCnt) { + Collections.sort(inputList, Bytes.BYTES_COMPARATOR); + assertTrue(lst.size() > 0); + int numEntries = inputList.size(); + int i = 0; + int j = i; + int bucketIndex = 0; + int error = 0; + for (Bucket b : lst) { + while (i= 0); + } +}