Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CAE41172B for ; Wed, 9 Apr 2014 17:59:30 +0000 (UTC) Received: (qmail 38314 invoked by uid 500); 9 Apr 2014 17:58:04 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 37907 invoked by uid 500); 9 Apr 2014 17:57:49 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 37238 invoked by uid 99); 9 Apr 2014 17:57:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Apr 2014 17:57:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 269B194F8A8; Wed, 9 Apr 2014 17:57:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Wed, 09 Apr 2014 17:57:42 -0000 Message-Id: <21240640800748e2b4c997a5f5b30167@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/64] [abbrv] Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/data/Range.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/data/Range.java index 65873c3,0000000..122436b mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/data/Range.java +++ b/core/src/main/java/org/apache/accumulo/core/data/Range.java @@@ -1,906 -1,0 +1,899 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.data; + +import java.io.DataInput; +import java.io.DataOutput; - import java.io.InvalidObjectException; +import java.io.IOException; ++import java.io.InvalidObjectException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.data.thrift.TRange; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +/** + * This class is used to specify a range of Accumulo Keys. + * + */ + +public class Range implements WritableComparable { + + private Key start; + private Key stop; + private boolean startKeyInclusive; + private boolean stopKeyInclusive; + private boolean infiniteStartKey; + private boolean infiniteStopKey; + + /** + * Creates a range that goes from negative to positive infinity + */ + + public Range() { + this((Key) null, true, (Key) null, true); + } + + /** + * Creates a range from startKey inclusive to endKey inclusive + * + * @param startKey + * set this to null when negative infinity is needed + * @param endKey + * set this to null when positive infinity is needed + */ + public Range(Key startKey, Key endKey) { + this(startKey, true, endKey, true); + } + + /** + * Creates a range that covers an entire row + * + * @param row + * set this to null to cover all rows + */ + public Range(CharSequence row) { + this(row, true, row, true); + } + + /** + * Creates a range that covers an entire row + * + * @param row + * set this to null to cover all rows + */ + public Range(Text row) { + this(row, true, row, true); + } + + /** + * Creates a range from startRow inclusive to endRow inclusive + * + * @param startRow + * set this to null when negative infinity is needed + * @param endRow + * set this to null when positive infinity is needed + */ + public Range(Text startRow, Text endRow) { + this(startRow, true, endRow, true); + } + + /** + * Creates a range from startRow inclusive to endRow inclusive + * + * @param startRow + * set this to null when negative infinity is needed + * @param endRow + * set this to null when positive infinity is needed + */ + public Range(CharSequence startRow, CharSequence endRow) { + this(startRow, true, endRow, true); + } + + /** + * Creates a range from startRow to endRow + * + * @param startRow + * set this to null when negative infinity is needed + * @param startRowInclusive + * determines if the start row is skipped + * @param endRow + * set this to null when positive infinity is needed + * @param endRowInclusive + * determines if the endRow is included + */ + + public Range(Text startRow, boolean startRowInclusive, Text endRow, boolean endRowInclusive) { + this((startRow == null ? null : (startRowInclusive ? new Key(startRow) : new Key(startRow).followingKey(PartialKey.ROW))), true, (endRow == null ? null + : (endRowInclusive ? new Key(endRow).followingKey(PartialKey.ROW) : new Key(endRow))), false); + } + + /** + * Creates a range from startRow to endRow + * + * @param startRow + * set this to null when negative infinity is needed + * @param startRowInclusive + * determines if the start row is skipped + * @param endRow + * set this to null when positive infinity is needed + * @param endRowInclusive + * determines if the endRow is included + */ + + public Range(CharSequence startRow, boolean startRowInclusive, CharSequence endRow, boolean endRowInclusive) { + this(startRow == null ? null : new Text(startRow.toString()), startRowInclusive, endRow == null ? null : new Text(endRow.toString()), endRowInclusive); + } + + /** + * Creates a range from startKey to endKey + * + * @param startKey + * set this to null when negative infinity is needed + * @param startKeyInclusive + * determines if the ranges includes the start key + * @param endKey + * set this to null when infinity is needed + * @param endKeyInclusive + * determines if the range includes the end key + */ + public Range(Key startKey, boolean startKeyInclusive, Key endKey, boolean endKeyInclusive) { + this.start = startKey; + this.startKeyInclusive = startKeyInclusive; + this.infiniteStartKey = startKey == null; + this.stop = endKey; + this.stopKeyInclusive = endKeyInclusive; + this.infiniteStopKey = stop == null; + + if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(endKey)) { + throw new IllegalArgumentException("Start key must be less than end key in range (" + startKey + ", " + endKey + ")"); + } + } + + /** + * Copies a range + */ + public Range(Range range) { + this(range.start, range.startKeyInclusive, range.infiniteStartKey, range.stop, range.stopKeyInclusive, range.infiniteStopKey); + } + + /** + * Creates a range from start to stop. + * + * @param start + * set this to null when negative infinity is needed + * @param stop + * set this to null when infinity is needed + * @param startKeyInclusive + * determines if the ranges includes the start key + * @param stopKeyInclusive + * determines if the range includes the end key + * @param infiniteStartKey + * true if start key is negative infinity (null) + * @param infiniteStopKey + * true if stop key is positive infinity (null) + * @throws IllegalArgumentException if stop is before start, or infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not + * null + */ + public Range(Key start, Key stop, boolean startKeyInclusive, boolean stopKeyInclusive, boolean infiniteStartKey, boolean infiniteStopKey) { + this(start, startKeyInclusive, infiniteStartKey, stop, stopKeyInclusive, infiniteStopKey); + if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) { + throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")"); + } + } + + /** + * Creates a range from start to stop. Unlike the public six-argument method, + * this one does not assure that stop is after start, which helps performance + * in cases where that assurance is already in place. + * + * @param start + * set this to null when negative infinity is needed + * @param startKeyInclusive + * determines if the ranges includes the start key + * @param infiniteStartKey + * true if start key is negative infinity (null) + * @param stop + * set this to null when infinity is needed + * @param stopKeyInclusive + * determines if the range includes the end key + * @param infiniteStopKey + * true if stop key is positive infinity (null) + * @throws IllegalArgumentException if infiniteStartKey is true but start is not null, or infiniteStopKey is true but stop is not null + */ + protected Range(Key start, boolean startKeyInclusive, boolean infiniteStartKey, Key stop, boolean stopKeyInclusive, boolean infiniteStopKey) { + if (infiniteStartKey && start != null) + throw new IllegalArgumentException(); + + if (infiniteStopKey && stop != null) + throw new IllegalArgumentException(); + + this.start = start; + this.stop = stop; + this.startKeyInclusive = startKeyInclusive; + this.stopKeyInclusive = stopKeyInclusive; + this.infiniteStartKey = infiniteStartKey; + this.infiniteStopKey = infiniteStopKey; + } + + public Range(TRange trange) { + this(trange.start == null ? null : new Key(trange.start), trange.startKeyInclusive, trange.infiniteStartKey, + trange.stop == null ? null : new Key(trange.stop), trange.stopKeyInclusive, trange.infiniteStopKey); + if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) { + throw new IllegalArgumentException("Start key must be less than end key in range (" + start + ", " + stop + ")"); + } + } + + /** + * @return the first key in the range, null if the key is infinite + */ + public Key getStartKey() { + if (infiniteStartKey) { + return null; + } + return start; + } + + /** + * - * @param key + * @return true if the given key is before the range, otherwise false + */ + public boolean beforeStartKey(Key key) { + if (infiniteStartKey) { + return false; + } + + if (startKeyInclusive) + return key.compareTo(start) < 0; + return key.compareTo(start) <= 0; + } + + /** + * @return the last key in the range, null if the end key is infinite + */ + + public Key getEndKey() { + if (infiniteStopKey) { + return null; + } + return stop; + } + + /** - * @param key + * @return true if the given key is after the range, otherwise false + */ + + public boolean afterEndKey(Key key) { + if (infiniteStopKey) + return false; + + if (stopKeyInclusive) + return stop.compareTo(key) < 0; + return stop.compareTo(key) <= 0; + } + + @Override + public int hashCode() { + int startHash = infiniteStartKey ? 0 : start.hashCode() + (startKeyInclusive ? 1 : 0); + int stopHash = infiniteStopKey ? 0 : stop.hashCode() + (stopKeyInclusive ? 1 : 0); + + return startHash + stopHash; + } + + @Override + public boolean equals(Object o) { + if (o instanceof Range) + return equals((Range) o); + return false; + } + + public boolean equals(Range otherRange) { + + return compareTo(otherRange) == 0; + } + + /** + * Compares this range to another range. Compares in the order start key, inclusiveness of start key, end key, inclusiveness of end key. Infinite keys sort + * first, and non-infinite keys are compared with {@link Key#compareTo(Key)}. Inclusive sorts before non-inclusive. + */ ++ @Override + public int compareTo(Range o) { + int comp; + + if (infiniteStartKey) + if (o.infiniteStartKey) + comp = 0; + else + comp = -1; + else if (o.infiniteStartKey) + comp = 1; + else { + comp = start.compareTo(o.start); + if (comp == 0) + if (startKeyInclusive && !o.startKeyInclusive) + comp = -1; + else if (!startKeyInclusive && o.startKeyInclusive) + comp = 1; + + } + + if (comp == 0) + if (infiniteStopKey) + if (o.infiniteStopKey) + comp = 0; + else + comp = 1; + else if (o.infiniteStopKey) + comp = -1; + else { + comp = stop.compareTo(o.stop); + if (comp == 0) + if (stopKeyInclusive && !o.stopKeyInclusive) + comp = 1; + else if (!stopKeyInclusive && o.stopKeyInclusive) + comp = -1; + } + + return comp; + } + + /** + * - * @param key + * @return true if the given key falls within the range + */ + public boolean contains(Key key) { + return !beforeStartKey(key) && !afterEndKey(key); + } + + /** + * Takes a collection on range and merges overlapping and adjacent ranges. For example given the following input + * + *
 +   * [a,c], (c, d], (g,m), (j,t]
 +   * 
+ * + * the following ranges would be returned + * + *
 +   * [a,d], (g,t]
 +   * 
+ * - * @param ranges + * @return list of merged ranges + */ + + public static List mergeOverlapping(Collection ranges) { + if (ranges.size() == 0) + return Collections.emptyList(); + + List ral = new ArrayList(ranges); + Collections.sort(ral); + + ArrayList ret = new ArrayList(ranges.size()); + + Range currentRange = ral.get(0); + boolean currentStartKeyInclusive = ral.get(0).startKeyInclusive; + + for (int i = 1; i < ral.size(); i++) { + // because of inclusive switch, equal keys may not be seen + + if (currentRange.infiniteStopKey) { + // this range has the minimal start key and + // an infinite end key so it will contain all + // other ranges + break; + } + + Range range = ral.get(i); + + boolean startKeysEqual; + if (range.infiniteStartKey) { + // previous start key must be infinite because it is sorted + assert (currentRange.infiniteStartKey); + startKeysEqual = true; + } else if (currentRange.infiniteStartKey) { + startKeysEqual = false; + } else if (currentRange.start.equals(range.start)) { + startKeysEqual = true; + } else { + startKeysEqual = false; + } + + if (startKeysEqual || currentRange.contains(range.start) + || (!currentRange.stopKeyInclusive && range.startKeyInclusive && range.start.equals(currentRange.stop))) { + int cmp; + + if (range.infiniteStopKey || (cmp = range.stop.compareTo(currentRange.stop)) > 0 || (cmp == 0 && range.stopKeyInclusive)) { + currentRange = new Range(currentRange.getStartKey(), currentStartKeyInclusive, range.getEndKey(), range.stopKeyInclusive); + }/* else currentRange contains ral.get(i) */ + } else { + ret.add(currentRange); + currentRange = range; + currentStartKeyInclusive = range.startKeyInclusive; + } + } + + ret.add(currentRange); + + return ret; + } + + /** + * Creates a range which represents the intersection of this range and the passed in range. The following example will print true. + * + *
 +   * Range range1 = new Range("a", "f");
 +   * Range range2 = new Range("c", "n");
 +   * Range range3 = range1.clip(range2);
 +   * System.out.println(range3.equals(new Range("c", "f")));
 +   * 
+ * - * @param range + * @return the intersection + * @throws IllegalArgumentException + * if ranges does not overlap + */ + + public Range clip(Range range) { + return clip(range, false); + } + + /** + * Same as other clip function except if gives the option to return null of the ranges do not overlap instead of throwing an exception. + * + * @see Range#clip(Range) - * @param range + * @param returnNullIfDisjoint + * If the ranges do not overlap and true is passed, then null is returned otherwise an exception is thrown. + * @return the intersection + */ + + public Range clip(Range range, boolean returnNullIfDisjoint) { + + Key sk = range.getStartKey(); + boolean ski = range.isStartKeyInclusive(); + + Key ek = range.getEndKey(); + boolean eki = range.isEndKeyInclusive(); + + if (range.getStartKey() == null) { + if (getStartKey() != null) { + sk = getStartKey(); + ski = isStartKeyInclusive(); + } + } else if (afterEndKey(range.getStartKey()) + || (getEndKey() != null && range.getStartKey().equals(getEndKey()) && !(range.isStartKeyInclusive() && isEndKeyInclusive()))) { + if (returnNullIfDisjoint) + return null; + throw new IllegalArgumentException("Range " + range + " does not overlap " + this); + } else if (beforeStartKey(range.getStartKey())) { + sk = getStartKey(); + ski = isStartKeyInclusive(); + } + + if (range.getEndKey() == null) { + if (getEndKey() != null) { + ek = getEndKey(); + eki = isEndKeyInclusive(); + } + } else if (beforeStartKey(range.getEndKey()) + || (getStartKey() != null && range.getEndKey().equals(getStartKey()) && !(range.isEndKeyInclusive() && isStartKeyInclusive()))) { + if (returnNullIfDisjoint) + return null; + throw new IllegalArgumentException("Range " + range + " does not overlap " + this); + } else if (afterEndKey(range.getEndKey())) { + ek = getEndKey(); + eki = isEndKeyInclusive(); + } + + return new Range(sk, ski, ek, eki); + } + + /** + * Creates a new range that is bounded by the columns passed in. The stary key in the returned range will have a column >= to the minimum column. The end key + * in the returned range will have a column <= the max column. + * - * - * @param min - * @param max + * @return a column bounded range + * @throws IllegalArgumentException + * if min > max + */ + + public Range bound(Column min, Column max) { + + if (min.compareTo(max) > 0) { + throw new IllegalArgumentException("min column > max column " + min + " " + max); + } + + Key sk = getStartKey(); + boolean ski = isStartKeyInclusive(); + + if (sk != null) { + + ByteSequence cf = sk.getColumnFamilyData(); + ByteSequence cq = sk.getColumnQualifierData(); + + ByteSequence mincf = new ArrayByteSequence(min.columnFamily); + ByteSequence mincq; + + if (min.columnQualifier != null) + mincq = new ArrayByteSequence(min.columnQualifier); + else + mincq = new ArrayByteSequence(new byte[0]); + + int cmp = cf.compareTo(mincf); + + if (cmp < 0 || (cmp == 0 && cq.compareTo(mincq) < 0)) { + ski = true; + sk = new Key(sk.getRowData().toArray(), mincf.toArray(), mincq.toArray(), new byte[0], Long.MAX_VALUE, true); + } + } + + Key ek = getEndKey(); + boolean eki = isEndKeyInclusive(); + + if (ek != null) { + ByteSequence row = ek.getRowData(); + ByteSequence cf = ek.getColumnFamilyData(); + ByteSequence cq = ek.getColumnQualifierData(); + ByteSequence cv = ek.getColumnVisibilityData(); + + ByteSequence maxcf = new ArrayByteSequence(max.columnFamily); + ByteSequence maxcq = null; + if (max.columnQualifier != null) + maxcq = new ArrayByteSequence(max.columnQualifier); + + boolean set = false; + + int comp = cf.compareTo(maxcf); + + if (comp > 0) { + set = true; + } else if (comp == 0 && maxcq != null && cq.compareTo(maxcq) > 0) { + set = true; + } else if (!eki && row.length() > 0 && row.byteAt(row.length() - 1) == 0 && cf.length() == 0 && cq.length() == 0 && cv.length() == 0 + && ek.getTimestamp() == Long.MAX_VALUE) { + row = row.subSequence(0, row.length() - 1); + set = true; + } + + if (set) { + eki = false; + if (maxcq == null) + ek = new Key(row.toArray(), maxcf.toArray(), new byte[0], new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM); + else + ek = new Key(row.toArray(), maxcf.toArray(), maxcq.toArray(), new byte[0], 0, false).followingKey(PartialKey.ROW_COLFAM_COLQUAL); + } + } + + return new Range(sk, ski, ek, eki); + } + ++ @Override + public String toString() { + return ((startKeyInclusive && start != null) ? "[" : "(") + (start == null ? "-inf" : start) + "," + (stop == null ? "+inf" : stop) + + ((stopKeyInclusive && stop != null) ? "]" : ")"); + } + ++ @Override + public void readFields(DataInput in) throws IOException { + infiniteStartKey = in.readBoolean(); + infiniteStopKey = in.readBoolean(); + if (!infiniteStartKey) { + start = new Key(); + start.readFields(in); + } else { + start = null; + } + + if (!infiniteStopKey) { + stop = new Key(); + stop.readFields(in); + } else { + stop = null; + } + + startKeyInclusive = in.readBoolean(); + stopKeyInclusive = in.readBoolean(); + + if (!infiniteStartKey && !infiniteStopKey && beforeStartKey(stop)) { + throw new InvalidObjectException("Start key must be less than end key in range (" + start + ", " + stop + ")"); + } + } + ++ @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(infiniteStartKey); + out.writeBoolean(infiniteStopKey); + if (!infiniteStartKey) + start.write(out); + if (!infiniteStopKey) + stop.write(out); + out.writeBoolean(startKeyInclusive); + out.writeBoolean(stopKeyInclusive); + } + + public boolean isStartKeyInclusive() { + return startKeyInclusive; + } + + public boolean isEndKeyInclusive() { + return stopKeyInclusive; + } + + public TRange toThrift() { + return new TRange(start == null ? null : start.toThrift(), stop == null ? null : stop.toThrift(), startKeyInclusive, stopKeyInclusive, infiniteStartKey, + infiniteStopKey); + } + + public boolean isInfiniteStartKey() { + return infiniteStartKey; + } + + public boolean isInfiniteStopKey() { + return infiniteStopKey; + } + + /** + * Creates a range that covers an exact row Returns the same Range as new Range(row) + * + * @param row + * all keys in the range will have this row + */ + public static Range exact(Text row) { + return new Range(row); + } + + /** + * Creates a range that covers an exact row and column family + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + */ + public static Range exact(Text row, Text cf) { + Key startKey = new Key(row, cf); + return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM), false); + } + + /** + * Creates a range that covers an exact row, column family, and column qualifier + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + * + * @param cq + * all keys in the range will have this column qualifier + */ + public static Range exact(Text row, Text cf, Text cq) { + Key startKey = new Key(row, cf, cq); + return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL), false); + } + + /** + * Creates a range that covers an exact row, column family, column qualifier, and visibility + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + * + * @param cq + * all keys in the range will have this column qualifier + * + * @param cv + * all keys in the range will have this column visibility + */ + public static Range exact(Text row, Text cf, Text cq, Text cv) { + Key startKey = new Key(row, cf, cq, cv); + return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS), false); + } + + /** + * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + * + * @param cq + * all keys in the range will have this column qualifier + * + * @param cv + * all keys in the range will have this column visibility + * + * @param ts + * all keys in the range will have this timestamp + */ + public static Range exact(Text row, Text cf, Text cq, Text cv, long ts) { + Key startKey = new Key(row, cf, cq, cv, ts); + return new Range(startKey, true, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL_COLVIS_TIME), false); + } + + /** + * Returns a Text that sorts just after all Texts beginning with a prefix - * - * @param prefix + */ + public static Text followingPrefix(Text prefix) { + byte[] prefixBytes = prefix.getBytes(); + + // find the last byte in the array that is not 0xff + int changeIndex = prefix.getLength() - 1; + while (changeIndex >= 0 && prefixBytes[changeIndex] == (byte) 0xff) + changeIndex--; + if (changeIndex < 0) + return null; + + // copy prefix bytes into new array + byte[] newBytes = new byte[changeIndex + 1]; + System.arraycopy(prefixBytes, 0, newBytes, 0, changeIndex + 1); + + // increment the selected byte + newBytes[changeIndex]++; + return new Text(newBytes); + } + + /** + * Returns a Range that covers all rows beginning with a prefix + * + * @param rowPrefix + * all keys in the range will have rows that begin with this prefix + */ + public static Range prefix(Text rowPrefix) { + Text fp = followingPrefix(rowPrefix); + return new Range(new Key(rowPrefix), true, fp == null ? null : new Key(fp), false); + } + + /** + * Returns a Range that covers all column families beginning with a prefix within a given row + * + * @param row + * all keys in the range will have this row + * + * @param cfPrefix + * all keys in the range will have column families that begin with this prefix + */ + public static Range prefix(Text row, Text cfPrefix) { + Text fp = followingPrefix(cfPrefix); + return new Range(new Key(row, cfPrefix), true, fp == null ? new Key(row).followingKey(PartialKey.ROW) : new Key(row, fp), false); + } + + /** + * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + * + * @param cqPrefix + * all keys in the range will have column qualifiers that begin with this prefix + */ + public static Range prefix(Text row, Text cf, Text cqPrefix) { + Text fp = followingPrefix(cqPrefix); + return new Range(new Key(row, cf, cqPrefix), true, fp == null ? new Key(row, cf).followingKey(PartialKey.ROW_COLFAM) : new Key(row, cf, fp), false); + } + + /** + * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier + * + * @param row + * all keys in the range will have this row + * + * @param cf + * all keys in the range will have this column family + * + * @param cq + * all keys in the range will have this column qualifier + * + * @param cvPrefix + * all keys in the range will have column visibilities that begin with this prefix + */ + public static Range prefix(Text row, Text cf, Text cq, Text cvPrefix) { + Text fp = followingPrefix(cvPrefix); + return new Range(new Key(row, cf, cq, cvPrefix), true, fp == null ? new Key(row, cf, cq).followingKey(PartialKey.ROW_COLFAM_COLQUAL) : new Key(row, cf, cq, + fp), false); + } + + /** + * Creates a range that covers an exact row + * + * @see Range#exact(Text) + */ + public static Range exact(CharSequence row) { + return Range.exact(new Text(row.toString())); + } + + /** + * Creates a range that covers an exact row and column family + * + * @see Range#exact(Text, Text) + */ + public static Range exact(CharSequence row, CharSequence cf) { + return Range.exact(new Text(row.toString()), new Text(cf.toString())); + } + + /** + * Creates a range that covers an exact row, column family, and column qualifier + * + * @see Range#exact(Text, Text, Text) + */ + public static Range exact(CharSequence row, CharSequence cf, CharSequence cq) { + return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString())); + } + + /** + * Creates a range that covers an exact row, column family, column qualifier, and visibility + * + * @see Range#exact(Text, Text, Text, Text) + */ + public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv) { + return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString())); + } + + /** + * Creates a range that covers an exact row, column family, column qualifier, visibility, and timestamp + * + * @see Range#exact(Text, Text, Text, Text, long) + */ + public static Range exact(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cv, long ts) { + return Range.exact(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cv.toString()), ts); + } + + /** + * Returns a Range that covers all rows beginning with a prefix + * + * @see Range#prefix(Text) + */ + public static Range prefix(CharSequence rowPrefix) { + return Range.prefix(new Text(rowPrefix.toString())); + } + + /** + * Returns a Range that covers all column families beginning with a prefix within a given row + * + * @see Range#prefix(Text, Text) + */ + public static Range prefix(CharSequence row, CharSequence cfPrefix) { + return Range.prefix(new Text(row.toString()), new Text(cfPrefix.toString())); + } + + /** + * Returns a Range that covers all column qualifiers beginning with a prefix within a given row and column family + * + * @see Range#prefix(Text, Text, Text) + */ + public static Range prefix(CharSequence row, CharSequence cf, CharSequence cqPrefix) { + return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cqPrefix.toString())); + } + + /** + * Returns a Range that covers all column visibilities beginning with a prefix within a given row, column family, and column qualifier + * + * @see Range#prefix(Text, Text, Text, Text) + */ + public static Range prefix(CharSequence row, CharSequence cf, CharSequence cq, CharSequence cvPrefix) { + return Range.prefix(new Text(row.toString()), new Text(cf.toString()), new Text(cq.toString()), new Text(cvPrefix.toString())); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java index 2b3cdf5,0000000..0ac5308 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java @@@ -1,181 -1,0 +1,176 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.rfile; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.blockfile.ABlockReader; +import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry; + +/** + * + */ +public class BlockIndex { + + public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException { + + BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class); + + int accessCount = blockIndex.accessCount.incrementAndGet(); + + // 1 is a power of two, but do not care about it + if (accessCount >= 2 && isPowerOfTwo(accessCount)) { + blockIndex.buildIndex(accessCount, cacheBlock, indexEntry); + } + + if (blockIndex.blockIndex != null) + return blockIndex; + + return null; + } + + private static boolean isPowerOfTwo(int x) { + return ((x > 0) && (x & (x - 1)) == 0); + } + + private AtomicInteger accessCount = new AtomicInteger(0); + private volatile BlockIndexEntry[] blockIndex = null; + + public static class BlockIndexEntry implements Comparable { + + private Key prevKey; + private int entriesLeft; + private int pos; + + public BlockIndexEntry(int pos, int entriesLeft, Key prevKey) { + this.pos = pos; + this.entriesLeft = entriesLeft; + this.prevKey = prevKey; + } + - /** - * @param key - */ + public BlockIndexEntry(Key key) { + this.prevKey = key; + } - - + + public int getEntriesLeft() { + return entriesLeft; + } + + @Override + public int compareTo(BlockIndexEntry o) { + return prevKey.compareTo(o.prevKey); + } + + @Override + public String toString() { + return prevKey + " " + entriesLeft + " " + pos; + } + + public Key getPrevKey() { + return prevKey; + } + } + + public BlockIndexEntry seekBlock(Key startKey, ABlockReader cacheBlock) { + + // get a local ref to the index, another thread could change it + BlockIndexEntry[] blockIndex = this.blockIndex; + + int pos = Arrays.binarySearch(blockIndex, new BlockIndexEntry(startKey)); + + int index; + + if (pos < 0) { + if (pos == -1) + return null; // less than the first key in index, did not index the first key in block so just return null... code calling this will scan from beginning + // of block + index = (pos * -1) - 2; + } else { + // found exact key in index + index = pos; + while (index > 0) { + if (blockIndex[index].getPrevKey().equals(startKey)) + index--; + else + break; + } + } + + // handle case where multiple keys in block are exactly the same, want to find the earliest key in the index + while (index - 1 > 0) { + if (blockIndex[index].getPrevKey().equals(blockIndex[index - 1].getPrevKey())) + index--; + else + break; + + } + + if (index == 0 && blockIndex[index].getPrevKey().equals(startKey)) + return null; + + BlockIndexEntry bie = blockIndex[index]; + cacheBlock.seek(bie.pos); + return bie; + } + + private synchronized void buildIndex(int indexEntries, ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException { + cacheBlock.seek(0); + + RelativeKey rk = new RelativeKey(); + Value val = new Value(); + + int interval = indexEntry.getNumEntries() / indexEntries; + + if (interval <= 32) + return; + + // multiple threads could try to create the index with different sizes, do not replace a large index with a smaller one + if (this.blockIndex != null && this.blockIndex.length > indexEntries - 1) + return; + + int count = 0; + + ArrayList index = new ArrayList(indexEntries - 1); + + while (count < (indexEntry.getNumEntries() - interval + 1)) { + + Key myPrevKey = rk.getKey(); + int pos = cacheBlock.getPosition(); + rk.readFields(cacheBlock); + val.readFields(cacheBlock); + + if (count > 0 && count % interval == 0) { + index.add(new BlockIndexEntry(pos, indexEntry.getNumEntries() - count, myPrevKey)); + } + + count++; + } + + this.blockIndex = index.toArray(new BlockIndexEntry[index.size()]); + + cacheBlock.seek(0); + } + + BlockIndexEntry[] getIndexEntries() { + return blockIndex; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java index 7277c65,0000000..7d15851 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java @@@ -1,971 -1,0 +1,965 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.accumulo.core.file.rfile.bcfile; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; +import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead; +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar; +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarComparator; +import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.ScalarLong; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; + +/** + * Block Compressed file, the underlying physical storage layer for TFile. BCFile provides the basic block level compression for the data block and meta blocks. + * It is separated from TFile as it may be used for other block-compressed file implementation. + */ +public final class BCFile { + // the current version of BCFile impl, increment them (major or minor) made + // enough changes + static final Version API_VERSION = new Version((short) 1, (short) 0); + static final Log LOG = LogFactory.getLog(BCFile.class); + + /** + * Prevent the instantiation of BCFile objects. + */ + private BCFile() { + // nothing + } + + /** + * BCFile writer, the entry point for creating a new BCFile. + */ + static public class Writer implements Closeable { + private final FSDataOutputStream out; + private final Configuration conf; + // the single meta block containing index of compressed data blocks + final DataIndex dataIndex; + // index for meta blocks + final MetaIndex metaIndex; + boolean blkInProgress = false; + private boolean metaBlkSeen = false; + private boolean closed = false; + long errorCount = 0; + // reusable buffers. + private BytesWritable fsOutputBuffer; + + /** + * Call-back interface to register a block after a block is closed. + */ + private static interface BlockRegister { + /** + * Register a block that is fully closed. + * + * @param raw + * The size of block in terms of uncompressed bytes. + * @param offsetStart + * The start offset of the block. + * @param offsetEnd + * One byte after the end of the block. Compressed block size is offsetEnd - offsetStart. + */ + public void register(long raw, long offsetStart, long offsetEnd); + } + + /** + * Intermediate class that maintain the state of a Writable Compression Block. + */ + private static final class WBlockState { + private final Algorithm compressAlgo; + private Compressor compressor; // !null only if using native + // Hadoop compression + private final FSDataOutputStream fsOut; + private final long posStart; + private final SimpleBufferedOutputStream fsBufferedOutput; + private OutputStream out; + + /** + * @param compressionAlgo + * The compression algorithm to be used to for compression. - * @throws IOException + */ + public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf) throws IOException { + this.compressAlgo = compressionAlgo; + this.fsOut = fsOut; + this.posStart = fsOut.getPos(); + + fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf)); + + this.fsBufferedOutput = new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes()); + this.compressor = compressAlgo.getCompressor(); + + try { + this.out = compressionAlgo.createCompressionStream(fsBufferedOutput, compressor, 0); + } catch (IOException e) { + compressAlgo.returnCompressor(compressor); + throw e; + } + } + + /** + * Get the output stream for BlockAppender's consumption. + * + * @return the output stream suitable for writing block data. + */ + OutputStream getOutputStream() { + return out; + } + + /** + * Get the current position in file. + * + * @return The current byte offset in underlying file. + * @throws IOException + */ + long getCurrentPos() throws IOException { + return fsOut.getPos() + fsBufferedOutput.size(); + } + + long getStartPos() { + return posStart; + } + + /** + * Current size of compressed data. + * + * @throws IOException + */ + long getCompressedSize() throws IOException { + long ret = getCurrentPos() - posStart; + return ret; + } + + /** + * Finishing up the current block. + */ + public void finish() throws IOException { + try { + if (out != null) { + out.flush(); + out = null; + } + } finally { + compressAlgo.returnCompressor(compressor); + compressor = null; + } + } + } + + /** + * Access point to stuff data into a block. + * + */ + public class BlockAppender extends DataOutputStream { + private final BlockRegister blockRegister; + private final WBlockState wBlkState; + private boolean closed = false; + + /** + * Constructor + * + * @param register + * the block register, which is called when the block is closed. + * @param wbs + * The writable compression block state. + */ + BlockAppender(BlockRegister register, WBlockState wbs) { + super(wbs.getOutputStream()); + this.blockRegister = register; + this.wBlkState = wbs; + } + + /** + * Get the raw size of the block. + * + * @return the number of uncompressed bytes written through the BlockAppender so far. - * @throws IOException + */ + public long getRawSize() throws IOException { + /** + * Expecting the size() of a block not exceeding 4GB. Assuming the size() will wrap to negative integer if it exceeds 2GB. + */ + return size() & 0x00000000ffffffffL; + } + + /** + * Get the compressed size of the block in progress. + * + * @return the number of compressed bytes written to the underlying FS file. The size may be smaller than actual need to compress the all data written due + * to internal buffering inside the compressor. - * @throws IOException + */ + public long getCompressedSize() throws IOException { + return wBlkState.getCompressedSize(); + } + + public long getStartPos() { + return wBlkState.getStartPos(); + } + + @Override + public void flush() { + // The down stream is a special kind of stream that finishes a + // compression block upon flush. So we disable flush() here. + } + + /** + * Signaling the end of write to the block. The block register will be called for registering the finished block. + */ + @Override + public void close() throws IOException { + if (closed == true) { + return; + } + try { + ++errorCount; + wBlkState.finish(); + blockRegister.register(getRawSize(), wBlkState.getStartPos(), wBlkState.getCurrentPos()); + --errorCount; + } finally { + closed = true; + blkInProgress = false; + } + } + } + + /** + * Constructor + * + * @param fout + * FS output stream. + * @param compressionName + * Name of the compression algorithm, which will be used for all data blocks. - * @throws IOException + * @see Compression#getSupportedAlgorithms + */ + public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks) throws IOException { + if (fout.getPos() != 0) { + throw new IOException("Output file not at zero offset."); + } + + this.out = fout; + this.conf = conf; + dataIndex = new DataIndex(compressionName, trackDataBlocks); + metaIndex = new MetaIndex(); + fsOutputBuffer = new BytesWritable(); + Magic.write(fout); + } + + /** + * Close the BCFile Writer. Attempting to use the Writer after calling close is not allowed and may lead to undetermined results. + */ ++ @Override + public void close() throws IOException { + if (closed == true) { + return; + } + + try { + if (errorCount == 0) { + if (blkInProgress == true) { + throw new IllegalStateException("Close() called with active block appender."); + } + + // add metaBCFileIndex to metaIndex as the last meta block + BlockAppender appender = prepareMetaBlock(DataIndex.BLOCK_NAME, getDefaultCompressionAlgorithm()); + try { + dataIndex.write(appender); + } finally { + appender.close(); + } + + long offsetIndexMeta = out.getPos(); + metaIndex.write(out); + + // Meta Index and the trailing section are written out directly. + out.writeLong(offsetIndexMeta); + + API_VERSION.write(out); + Magic.write(out); + out.flush(); + } + } finally { + closed = true; + } + } + + private Algorithm getDefaultCompressionAlgorithm() { + return dataIndex.getDefaultCompressionAlgorithm(); + } + + private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo) throws IOException, MetaBlockAlreadyExists { + if (blkInProgress == true) { + throw new IllegalStateException("Cannot create Meta Block until previous block is closed."); + } + + if (metaIndex.getMetaByName(name) != null) { + throw new MetaBlockAlreadyExists("name=" + name); + } + + MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo); + WBlockState wbs = new WBlockState(compressAlgo, out, fsOutputBuffer, conf); + BlockAppender ba = new BlockAppender(mbr, wbs); + blkInProgress = true; + metaBlkSeen = true; + return ba; + } + + /** + * Create a Meta Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Regular + * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation. + * + * @param name + * The name of the Meta Block. The name must not conflict with existing Meta Blocks. + * @param compressionName + * The name of the compression algorithm to be used. + * @return The BlockAppender stream - * @throws IOException + * @throws MetaBlockAlreadyExists + * If the meta block with the name already exists. + */ + public BlockAppender prepareMetaBlock(String name, String compressionName) throws IOException, MetaBlockAlreadyExists { + return prepareMetaBlock(name, Compression.getCompressionAlgorithmByName(compressionName)); + } + + /** + * Create a Meta Block and obtain an output stream for adding data into the block. The Meta Block will be compressed with the same compression algorithm as + * data blocks. There can only be one BlockAppender stream active at any time. Regular Blocks may not be created after the first Meta Blocks. The caller + * must call BlockAppender.close() to conclude the block creation. + * + * @param name + * The name of the Meta Block. The name must not conflict with existing Meta Blocks. + * @return The BlockAppender stream + * @throws MetaBlockAlreadyExists + * If the meta block with the name already exists. - * @throws IOException + */ + public BlockAppender prepareMetaBlock(String name) throws IOException, MetaBlockAlreadyExists { + return prepareMetaBlock(name, getDefaultCompressionAlgorithm()); + } + + /** + * Create a Data Block and obtain an output stream for adding data into the block. There can only be one BlockAppender stream active at any time. Data + * Blocks may not be created after the first Meta Blocks. The caller must call BlockAppender.close() to conclude the block creation. + * + * @return The BlockAppender stream - * @throws IOException + */ + public BlockAppender prepareDataBlock() throws IOException { + if (blkInProgress == true) { + throw new IllegalStateException("Cannot create Data Block until previous block is closed."); + } + + if (metaBlkSeen == true) { + throw new IllegalStateException("Cannot create Data Block after Meta Blocks."); + } + + DataBlockRegister dbr = new DataBlockRegister(); + + WBlockState wbs = new WBlockState(getDefaultCompressionAlgorithm(), out, fsOutputBuffer, conf); + BlockAppender ba = new BlockAppender(dbr, wbs); + blkInProgress = true; + return ba; + } + + /** + * Callback to make sure a meta block is added to the internal list when its stream is closed. + */ + private class MetaBlockRegister implements BlockRegister { + private final String name; + private final Algorithm compressAlgo; + + MetaBlockRegister(String name, Algorithm compressAlgo) { + this.name = name; + this.compressAlgo = compressAlgo; + } + ++ @Override + public void register(long raw, long begin, long end) { + metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo, new BlockRegion(begin, end - begin, raw))); + } + } + + /** + * Callback to make sure a data block is added to the internal list when it's being closed. + * + */ + private class DataBlockRegister implements BlockRegister { + DataBlockRegister() { + // do nothing + } + ++ @Override + public void register(long raw, long begin, long end) { + dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw)); + } + } + } + + /** + * BCFile Reader, interface to read the file's data and meta blocks. + */ + static public class Reader implements Closeable { + private static final String META_NAME = "BCFile.metaindex"; + private final FSDataInputStream in; + private final Configuration conf; + final DataIndex dataIndex; + // Index for meta blocks + final MetaIndex metaIndex; + final Version version; + + /** + * Intermediate class that maintain the state of a Readable Compression Block. + */ + static private final class RBlockState { + private final Algorithm compressAlgo; + private Decompressor decompressor; + private final BlockRegion region; + private final InputStream in; + + public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf) throws IOException { + this.compressAlgo = compressionAlgo; + this.region = region; + this.decompressor = compressionAlgo.getDecompressor(); + + try { + this.in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, this.region.getOffset(), this.region.getCompressedSize()), + decompressor, TFile.getFSInputBufferSize(conf)); + } catch (IOException e) { + compressAlgo.returnDecompressor(decompressor); + throw e; + } + } + + /** + * Get the output stream for BlockAppender's consumption. + * + * @return the output stream suitable for writing block data. + */ + public InputStream getInputStream() { + return in; + } + + public String getCompressionName() { + return compressAlgo.getName(); + } + + public BlockRegion getBlockRegion() { + return region; + } + + public void finish() throws IOException { + try { + in.close(); + } finally { + compressAlgo.returnDecompressor(decompressor); + decompressor = null; + } + } + } + + /** + * Access point to read a block. + */ + public static class BlockReader extends DataInputStream { + private final RBlockState rBlkState; + private boolean closed = false; + + BlockReader(RBlockState rbs) { + super(rbs.getInputStream()); + rBlkState = rbs; + } + + /** + * Finishing reading the block. Release all resources. + */ + @Override + public void close() throws IOException { + if (closed == true) { + return; + } + try { + // Do not set rBlkState to null. People may access stats after calling + // close(). + rBlkState.finish(); + } finally { + closed = true; + } + } + + /** + * Get the name of the compression algorithm used to compress the block. + * + * @return name of the compression algorithm. + */ + public String getCompressionName() { + return rBlkState.getCompressionName(); + } + + /** + * Get the uncompressed size of the block. + * + * @return uncompressed size of the block. + */ + public long getRawSize() { + return rBlkState.getBlockRegion().getRawSize(); + } + + /** + * Get the compressed size of the block. + * + * @return compressed size of the block. + */ + public long getCompressedSize() { + return rBlkState.getBlockRegion().getCompressedSize(); + } + + /** + * Get the starting position of the block in the file. + * + * @return the starting position of the block in the file. + */ + public long getStartPos() { + return rBlkState.getBlockRegion().getOffset(); + } + } + + /** + * Constructor + * + * @param fin + * FS input stream. + * @param fileLength + * Length of the corresponding file - * @throws IOException + */ + public Reader(FSDataInputStream fin, long fileLength, Configuration conf) throws IOException { + this.in = fin; + this.conf = conf; + + // move the cursor to the beginning of the tail, containing: offset to the + // meta block index, version and magic + fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE); + long offsetIndexMeta = fin.readLong(); + version = new Version(fin); + Magic.readAndVerify(fin); + + if (!version.compatibleWith(BCFile.API_VERSION)) { + throw new RuntimeException("Incompatible BCFile fileBCFileVersion."); + } + + // read meta index + fin.seek(offsetIndexMeta); + metaIndex = new MetaIndex(fin); + + // read data:BCFile.index, the data block index + BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME); + try { + dataIndex = new DataIndex(blockR); + } finally { + blockR.close(); + } + } + + public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf) throws IOException { + this.in = fin; + this.conf = conf; + + BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME); + BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME); + + if (cachedMetaIndex == null || cachedDataIndex == null) { + // move the cursor to the beginning of the tail, containing: offset to the + // meta block index, version and magic + fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE / Byte.SIZE); + long offsetIndexMeta = fin.readLong(); + version = new Version(fin); + Magic.readAndVerify(fin); + + if (!version.compatibleWith(BCFile.API_VERSION)) { + throw new RuntimeException("Incompatible BCFile fileBCFileVersion."); + } + + // read meta index + fin.seek(offsetIndexMeta); + metaIndex = new MetaIndex(fin); + if (cachedMetaIndex == null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + metaIndex.write(dos); + dos.close(); + cache.cacheMetaBlock(META_NAME, baos.toByteArray()); + } + + // read data:BCFile.index, the data block index + if (cachedDataIndex == null) { + BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME); + cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR); + } + + dataIndex = new DataIndex(cachedDataIndex); + cachedDataIndex.close(); + + } else { + // Logger.getLogger(Reader.class).debug("Read bcfile !METADATA from cache"); + version = null; + metaIndex = new MetaIndex(cachedMetaIndex); + dataIndex = new DataIndex(cachedDataIndex); + } + } + + /** + * Get the name of the default compression algorithm. + * + * @return the name of the default compression algorithm. + */ + public String getDefaultCompressionName() { + return dataIndex.getDefaultCompressionAlgorithm().getName(); + } + + /** + * Get version of BCFile file being read. + * + * @return version of BCFile file being read. + */ + public Version getBCFileVersion() { + return version; + } + + /** + * Get version of BCFile API. + * + * @return version of BCFile API. + */ + public Version getAPIVersion() { + return API_VERSION; + } + + /** + * Finishing reading the BCFile. Release all resources. + */ ++ @Override + public void close() { + // nothing to be done now + } + + /** + * Get the number of data blocks. + * + * @return the number of data blocks. + */ + public int getBlockCount() { + return dataIndex.getBlockRegionList().size(); + } + + /** + * Stream access to a Meta Block. + * + * @param name + * meta block name + * @return BlockReader input stream for reading the meta block. - * @throws IOException + * @throws MetaBlockDoesNotExist + * The Meta Block with the given name does not exist. + */ + public BlockReader getMetaBlock(String name) throws IOException, MetaBlockDoesNotExist { + MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name); + if (imeBCIndex == null) { + throw new MetaBlockDoesNotExist("name=" + name); + } + + BlockRegion region = imeBCIndex.getRegion(); + return createReader(imeBCIndex.getCompressionAlgorithm(), region); + } + + /** + * Stream access to a Data Block. + * + * @param blockIndex + * 0-based data block index. + * @return BlockReader input stream for reading the data block. - * @throws IOException + */ + public BlockReader getDataBlock(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= getBlockCount()) { + throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount())); + } + + BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex); + return createReader(dataIndex.getDefaultCompressionAlgorithm(), region); + } + + public BlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException { + BlockRegion region = new BlockRegion(offset, compressedSize, rawSize); + return createReader(dataIndex.getDefaultCompressionAlgorithm(), region); + } + + private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException { + RBlockState rbs = new RBlockState(compressAlgo, in, region, conf); + return new BlockReader(rbs); + } + + /** + * Find the smallest Block index whose starting offset is greater than or equal to the specified offset. + * + * @param offset + * User-specific offset. + * @return the index to the data Block if such block exists; or -1 otherwise. + */ + public int getBlockIndexNear(long offset) { + ArrayList list = dataIndex.getBlockRegionList(); + int idx = Utils.lowerBound(list, new ScalarLong(offset), new ScalarComparator()); + + if (idx == list.size()) { + return -1; + } + + return idx; + } + } + + /** + * Index for all Meta blocks. + */ + static class MetaIndex { + // use a tree map, for getting a meta block entry by name + final Map index; + + // for write + public MetaIndex() { + index = new TreeMap(); + } + + // for read, construct the map from the file + public MetaIndex(DataInput in) throws IOException { + int count = Utils.readVInt(in); + index = new TreeMap(); + + for (int nx = 0; nx < count; nx++) { + MetaIndexEntry indexEntry = new MetaIndexEntry(in); + index.put(indexEntry.getMetaName(), indexEntry); + } + } + + public void addEntry(MetaIndexEntry indexEntry) { + index.put(indexEntry.getMetaName(), indexEntry); + } + + public MetaIndexEntry getMetaByName(String name) { + return index.get(name); + } + + public void write(DataOutput out) throws IOException { + Utils.writeVInt(out, index.size()); + + for (MetaIndexEntry indexEntry : index.values()) { + indexEntry.write(out); + } + } + } + + /** + * An entry describes a meta block in the MetaIndex. + */ + static final class MetaIndexEntry { + private final String metaName; + private final Algorithm compressionAlgorithm; + private final static String defaultPrefix = "data:"; + + private final BlockRegion region; + + public MetaIndexEntry(DataInput in) throws IOException { + String fullMetaName = Utils.readString(in); + if (fullMetaName.startsWith(defaultPrefix)) { + metaName = fullMetaName.substring(defaultPrefix.length(), fullMetaName.length()); + } else { + throw new IOException("Corrupted Meta region Index"); + } + + compressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in)); + region = new BlockRegion(in); + } + + public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm, BlockRegion region) { + this.metaName = metaName; + this.compressionAlgorithm = compressionAlgorithm; + this.region = region; + } + + public String getMetaName() { + return metaName; + } + + public Algorithm getCompressionAlgorithm() { + return compressionAlgorithm; + } + + public BlockRegion getRegion() { + return region; + } + + public void write(DataOutput out) throws IOException { + Utils.writeString(out, defaultPrefix + metaName); + Utils.writeString(out, compressionAlgorithm.getName()); + + region.write(out); + } + } + + /** + * Index of all compressed data blocks. + */ + static class DataIndex { + final static String BLOCK_NAME = "BCFile.index"; + + private final Algorithm defaultCompressionAlgorithm; + + // for data blocks, each entry specifies a block's offset, compressed size + // and raw size + private final ArrayList listRegions; + + private boolean trackBlocks; + + // for read, deserialized from a file + public DataIndex(DataInput in) throws IOException { + defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(Utils.readString(in)); + + int n = Utils.readVInt(in); + listRegions = new ArrayList(n); + + for (int i = 0; i < n; i++) { + BlockRegion region = new BlockRegion(in); + listRegions.add(region); + } + } + + // for write + public DataIndex(String defaultCompressionAlgorithmName, boolean trackBlocks) { + this.trackBlocks = trackBlocks; + this.defaultCompressionAlgorithm = Compression.getCompressionAlgorithmByName(defaultCompressionAlgorithmName); + listRegions = new ArrayList(); + } + + public Algorithm getDefaultCompressionAlgorithm() { + return defaultCompressionAlgorithm; + } + + public ArrayList getBlockRegionList() { + return listRegions; + } + + public void addBlockRegion(BlockRegion region) { + if (trackBlocks) + listRegions.add(region); + } + + public void write(DataOutput out) throws IOException { + Utils.writeString(out, defaultCompressionAlgorithm.getName()); + + Utils.writeVInt(out, listRegions.size()); + + for (BlockRegion region : listRegions) { + region.write(out); + } + } + } + + /** + * Magic number uniquely identifying a BCFile in the header/footer. + */ + static final class Magic { + private final static byte[] AB_MAGIC_BCFILE = { + // ... total of 16 bytes + (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf, (byte) 0x41, + (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1, (byte) 0x50}; + + public static void readAndVerify(DataInput in) throws IOException { + byte[] abMagic = new byte[size()]; + in.readFully(abMagic); + + // check against AB_MAGIC_BCFILE, if not matching, throw an + // Exception + if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) { + throw new IOException("Not a valid BCFile."); + } + } + + public static void write(DataOutput out) throws IOException { + out.write(AB_MAGIC_BCFILE); + } + + public static int size() { + return AB_MAGIC_BCFILE.length; + } + } + + /** + * Block region. + */ + static final class BlockRegion implements Scalar { + private final long offset; + private final long compressedSize; + private final long rawSize; + + public BlockRegion(DataInput in) throws IOException { + offset = Utils.readVLong(in); + compressedSize = Utils.readVLong(in); + rawSize = Utils.readVLong(in); + } + + public BlockRegion(long offset, long compressedSize, long rawSize) { + this.offset = offset; + this.compressedSize = compressedSize; + this.rawSize = rawSize; + } + + public void write(DataOutput out) throws IOException { + Utils.writeVLong(out, offset); + Utils.writeVLong(out, compressedSize); + Utils.writeVLong(out, rawSize); + } + + public long getOffset() { + return offset; + } + + public long getCompressedSize() { + return compressedSize; + } + + public long getRawSize() { + return rawSize; + } + + @Override + public long magnitude() { + return offset; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java index 2b57638,0000000..d7734a2 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/ByteArray.java @@@ -1,91 -1,0 +1,89 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.accumulo.core.file.rfile.bcfile; + +import org.apache.hadoop.io.BytesWritable; + +/** + * Adaptor class to wrap byte-array backed objects (including java byte array) as RawComparable objects. + */ +public final class ByteArray implements RawComparable { + private final byte[] buffer; + private final int offset; + private final int len; + + /** + * Constructing a ByteArray from a {@link BytesWritable}. - * - * @param other + */ + public ByteArray(BytesWritable other) { + this(other.getBytes(), 0, other.getLength()); + } + + /** + * Wrap a whole byte array as a RawComparable. + * + * @param buffer + * the byte array buffer. + */ + public ByteArray(byte[] buffer) { + this(buffer, 0, buffer.length); + } + + /** + * Wrap a partial byte array as a RawComparable. + * + * @param buffer + * the byte array buffer. + * @param offset + * the starting offset + * @param len + * the length of the consecutive bytes to be wrapped. + */ + public ByteArray(byte[] buffer, int offset, int len) { + if ((offset | len | (buffer.length - offset - len)) < 0) { + throw new IndexOutOfBoundsException(); + } + this.buffer = buffer; + this.offset = offset; + this.len = len; + } + + /** + * @return the underlying buffer. + */ + @Override + public byte[] buffer() { + return buffer; + } + + /** + * @return the offset in the buffer. + */ + @Override + public int offset() { + return offset; + } + + /** + * @return the size of the byte array. + */ + @Override + public int size() { + return len; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/92613388/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java index a075d87,0000000..345d406 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/Chunk.java @@@ -1,418 -1,0 +1,416 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.accumulo.core.file.rfile.bcfile; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Several related classes to support chunk-encoded sub-streams on top of a regular stream. + */ +final class Chunk { + + /** + * Prevent the instantiation of class. + */ + private Chunk() { + // nothing + } + + /** + * Decoding a chain of chunks encoded through ChunkEncoder or SingleChunkEncoder. + */ + static public class ChunkDecoder extends InputStream { + private DataInputStream in = null; + private boolean lastChunk; + private int remain = 0; + private boolean closed; + + public ChunkDecoder() { + lastChunk = true; + closed = true; + } + + public void reset(DataInputStream downStream) { + // no need to wind forward the old input. + in = downStream; + lastChunk = false; + remain = 0; + closed = false; + } + + /** + * Constructor + * + * @param in + * The source input stream which contains chunk-encoded data stream. + */ + public ChunkDecoder(DataInputStream in) { + this.in = in; + lastChunk = false; + closed = false; + } + + /** + * Have we reached the last chunk. + * + * @return true if we have reached the last chunk. - * @throws java.io.IOException + */ + public boolean isLastChunk() throws IOException { + checkEOF(); + return lastChunk; + } + + /** + * How many bytes remain in the current chunk? + * + * @return remaining bytes left in the current chunk. - * @throws java.io.IOException + */ + public int getRemain() throws IOException { + checkEOF(); + return remain; + } + + /** + * Reading the length of next chunk. + * + * @throws java.io.IOException + * when no more data is available. + */ + private void readLength() throws IOException { + remain = Utils.readVInt(in); + if (remain >= 0) { + lastChunk = true; + } else { + remain = -remain; + } + } + + /** + * Check whether we reach the end of the stream. + * + * @return false if the chunk encoded stream has more data to read (in which case available() will be greater than 0); true otherwise. + * @throws java.io.IOException + * on I/O errors. + */ + private boolean checkEOF() throws IOException { + if (isClosed()) + return true; + while (true) { + if (remain > 0) + return false; + if (lastChunk) + return true; + readLength(); + } + } + + @Override + /* + * This method never blocks the caller. Returning 0 does not mean we reach the end of the stream. + */ + public int available() { + return remain; + } + + @Override + public int read() throws IOException { + if (checkEOF()) + return -1; + int ret = in.read(); + if (ret < 0) + throw new IOException("Corrupted chunk encoding stream"); + --remain; + return ret; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } + + if (!checkEOF()) { + int n = Math.min(remain, len); + int ret = in.read(b, off, n); + if (ret < 0) + throw new IOException("Corrupted chunk encoding stream"); + remain -= ret; + return ret; + } + return -1; + } + + @Override + public long skip(long n) throws IOException { + if (!checkEOF()) { + long ret = in.skip(Math.min(remain, n)); + remain -= ret; + return ret; + } + return 0; + } + + @Override + public boolean markSupported() { + return false; + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + if (closed == false) { + try { + while (!checkEOF()) { + skip(Integer.MAX_VALUE); + } + } finally { + closed = true; + } + } + } + } + + /** + * Chunk Encoder. Encoding the output data into a chain of chunks in the following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n, byte[len_n]. + * Where len1, len2, ..., len_n are the lengths of the data chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks cannot have length 0. + * All lengths are in the range of 0 to Integer.MAX_VALUE and are encoded in Utils.VInt format. + */ + static public class ChunkEncoder extends OutputStream { + /** + * The data output stream it connects to. + */ + private DataOutputStream out; + + /** + * The internal buffer that is only used when we do not know the advertised size. + */ + private byte buf[]; + + /** + * The number of valid bytes in the buffer. This value is always in the range 0 through buf.length; elements buf[0] through + * buf[count-1] contain valid byte data. + */ + private int count; + + /** + * Constructor. + * + * @param out + * the underlying output stream. + * @param buf + * user-supplied buffer. The buffer would be used exclusively by the ChunkEncoder during its life cycle. + */ + public ChunkEncoder(DataOutputStream out, byte[] buf) { + this.out = out; + this.buf = buf; + this.count = 0; + } + + /** + * Write out a chunk. + * + * @param chunk + * The chunk buffer. + * @param offset + * Offset to chunk buffer for the beginning of chunk. + * @param len + * @param last + * Is this the last call to flushBuffer? + */ + private void writeChunk(byte[] chunk, int offset, int len, boolean last) throws IOException { + if (last) { // always write out the length for the last chunk. + Utils.writeVInt(out, len); + if (len > 0) { + out.write(chunk, offset, len); + } + } else { + if (len > 0) { + Utils.writeVInt(out, -len); + out.write(chunk, offset, len); + } + } + } + + /** + * Write out a chunk that is a concatenation of the internal buffer plus user supplied data. This will never be the last block. + * + * @param data + * User supplied data buffer. + * @param offset + * Offset to user data buffer. + * @param len + * User data buffer size. + */ + private void writeBufData(byte[] data, int offset, int len) throws IOException { + if (count + len > 0) { + Utils.writeVInt(out, -(count + len)); + out.write(buf, 0, count); + count = 0; + out.write(data, offset, len); + } + } + + /** + * Flush the internal buffer. + * + * Is this the last call to flushBuffer? + * + * @throws java.io.IOException + */ + private void flushBuffer() throws IOException { + if (count > 0) { + writeChunk(buf, 0, count, false); + count = 0; + } + } + + @Override + public void write(int b) throws IOException { + if (count >= buf.length) { + flushBuffer(); + } + buf[count++] = (byte) b; + } + + @Override + public void write(byte b[]) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + if ((len + count) >= buf.length) { + /* + * If the input data do not fit in buffer, flush the output buffer and then write the data directly. In this way buffered streams will cascade + * harmlessly. + */ + writeBufData(b, off, len); + return; + } + + System.arraycopy(b, off, buf, count, len); + count += len; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + out.flush(); + } + + @Override + public void close() throws IOException { + if (buf != null) { + try { + writeChunk(buf, 0, count, true); + } finally { + buf = null; + out = null; + } + } + } + } + + /** + * Encode the whole stream as a single chunk. Expecting to know the size of the chunk up-front. + */ + static public class SingleChunkEncoder extends OutputStream { + /** + * The data output stream it connects to. + */ + private final DataOutputStream out; + + /** + * The remaining bytes to be written. + */ + private int remain; + private boolean closed = false; + + /** + * Constructor. + * + * @param out + * the underlying output stream. + * @param size + * The total # of bytes to be written as a single chunk. + * @throws java.io.IOException + * if an I/O error occurs. + */ + public SingleChunkEncoder(DataOutputStream out, int size) throws IOException { + this.out = out; + this.remain = size; + Utils.writeVInt(out, size); + } + + @Override + public void write(int b) throws IOException { + if (remain > 0) { + out.write(b); + --remain; + } else { + throw new IOException("Writing more bytes than advertised size."); + } + } + + @Override + public void write(byte b[]) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + if (remain >= len) { + out.write(b, off, len); + remain -= len; + } else { + throw new IOException("Writing more bytes than advertised size."); + } + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + if (closed == true) { + return; + } + + try { + if (remain > 0) { + throw new IOException("Writing less bytes than advertised size."); + } + } finally { + closed = true; + } + } + } +}