hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r787913 [3/4] - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/io/file/ src/java/org/apache/hadoop/io/file/tfile/ src/test/ src/test/core/org/apache/hadoop/io/file/ src/test/core/org/apache/hadoop/io/file/tfile/
Date Wed, 24 Jun 2009 05:48:26 GMT
Added: hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/Utils.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/Utils.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/file/tfile/Utils.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Supporting Utility classes used by TFile, and shared by users of TFile.
+ */
+public final class Utils {
+
+  /**
+   * Prevent the instantiation of Utils.
+   */
+  private Utils() {
+    // nothing
+  }
+
+  /**
+   * Encoding an integer into a variable-length encoding format. Synonymous to
+   * <code>Utils#writeVLong(out, n)</code>.
+   * 
+   * @param out
+   *          output stream
+   * @param n
+   *          The integer to be encoded
+   * @throws IOException
+   * @see Utils#writeVLong(DataOutput, long)
+   */
+  public static void writeVInt(DataOutput out, int n) throws IOException {
+    writeVLong(out, n);
+  }
+
+  /**
+   * Encoding a Long integer into a variable-length encoding format.
+   * <ul>
+   * <li>if n in [-32, 127): encode in one byte with the actual value.
+   * Otherwise,
+   * <li>if n in [-20*2^8, 20*2^8): encode in two bytes: byte[0] = n/256 - 52;
+   * byte[1]=n&0xff. Otherwise,
+   * <li>if n IN [-16*2^16, 16*2^16): encode in three bytes: byte[0]=n/2^16 -
+   * 88; byte[1]=(n>>8)&0xff; byte[2]=n&0xff. Otherwise,
+   * <li>if n in [-8*2^24, 8*2^24): encode in four bytes: byte[0]=n/2^24 - 112;
+   * byte[1] = (n>>16)&0xff; byte[2] = (n>>8)&0xff; byte[3]=n&0xff. Otherwise:
+   * <li>if n in [-2^31, 2^31): encode in five bytes: byte[0]=-125; byte[1] =
+   * (n>>24)&0xff; byte[2]=(n>>16)&0xff; byte[3]=(n>>8)&0xff; byte[4]=n&0xff;
+   * <li>if n in [-2^39, 2^39): encode in six bytes: byte[0]=-124; byte[1] =
+   * (n>>32)&0xff; byte[2]=(n>>24)&0xff; byte[3]=(n>>16)&0xff;
+   * byte[4]=(n>>8)&0xff; byte[5]=n&0xff
+   * <li>if n in [-2^47, 2^47): encode in seven bytes: byte[0]=-123; byte[1] =
+   * (n>>40)&0xff; byte[2]=(n>>32)&0xff; byte[3]=(n>>24)&0xff;
+   * byte[4]=(n>>16)&0xff; byte[5]=(n>>8)&0xff; byte[6]=n&0xff;
+   * <li>if n in [-2^55, 2^55): encode in eight bytes: byte[0]=-122; byte[1] =
+   * (n>>48)&0xff; byte[2] = (n>>40)&0xff; byte[3]=(n>>32)&0xff;
+   * byte[4]=(n>>24)&0xff; byte[5]=(n>>16)&0xff; byte[6]=(n>>8)&0xff;
+   * byte[7]=n&0xff;
+   * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] =
+   * (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff;
+   * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff;
+   * byte[7]=(n>>8)&0xff; byte[8]=n&0xff;
+   * </ul>
+   * 
+   * @param out
+   *          output stream
+   * @param n
+   *          the integer number
+   * @throws IOException
+   */
+  @SuppressWarnings("fallthrough")
+  public static void writeVLong(DataOutput out, long n) throws IOException {
+    if ((n < 128) && (n >= -32)) {
+      out.writeByte((int) n);
+      return;
+    }
+
+    long un = (n < 0) ? ~n : n;
+    // how many bytes do we need to represent the number with sign bit?
+    int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1;
+    int firstByte = (int) (n >> ((len - 1) * 8));
+    switch (len) {
+      case 1:
+        // fall it through to firstByte==-1, len=2.
+        firstByte >>= 8;
+      case 2:
+        if ((firstByte < 20) && (firstByte >= -20)) {
+          out.writeByte(firstByte - 52);
+          out.writeByte((int) n);
+          return;
+        }
+        // fall it through to firstByte==0/-1, len=3.
+        firstByte >>= 8;
+      case 3:
+        if ((firstByte < 16) && (firstByte >= -16)) {
+          out.writeByte(firstByte - 88);
+          out.writeShort((int) n);
+          return;
+        }
+        // fall it through to firstByte==0/-1, len=4.
+        firstByte >>= 8;
+      case 4:
+        if ((firstByte < 8) && (firstByte >= -8)) {
+          out.writeByte(firstByte - 112);
+          out.writeShort(((int) n) >>> 8);
+          out.writeByte((int) n);
+          return;
+        }
+        out.writeByte(len - 129);
+        out.writeInt((int) n);
+        return;
+      case 5:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 8));
+        out.writeByte((int) n);
+        return;
+      case 6:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 16));
+        out.writeShort((int) n);
+        return;
+      case 7:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 24));
+        out.writeShort((int) (n >>> 8));
+        out.writeByte((int) n);
+        return;
+      case 8:
+        out.writeByte(len - 129);
+        out.writeLong(n);
+        return;
+      default:
+        throw new RuntimeException("Internel error");
+    }
+  }
+
+  /**
+   * Decoding the variable-length integer. Synonymous to
+   * <code>(int)Utils#readVLong(in)</code>.
+   * 
+   * @param in
+   *          input stream
+   * @return the decoded integer
+   * @throws IOException
+   * 
+   * @see Utils#readVLong(DataInput)
+   */
+  public static int readVInt(DataInput in) throws IOException {
+    long ret = readVLong(in);
+    if ((ret > Integer.MAX_VALUE) || (ret < Integer.MIN_VALUE)) {
+      throw new RuntimeException(
+          "Number too large to be represented as Integer");
+    }
+    return (int) ret;
+  }
+
+  /**
+   * Decoding the variable-length integer. Suppose the value of the first byte
+   * is FB, and the following bytes are NB[*].
+   * <ul>
+   * <li>if (FB >= -32), return (long)FB;
+   * <li>if (FB in [-72, -33]), return (FB+52)<<8 + NB[0]&0xff;
+   * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 +
+   * NB[1]&0xff;
+   * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 +
+   * (NB[1]&0xff)<<8 + NB[2]&0xff;
+   * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed
+   * big-endian integer.
+   * 
+   * @param in
+   *          input stream
+   * @return the decoded long integer.
+   * @throws IOException
+   */
+
+  public static long readVLong(DataInput in) throws IOException {
+    int firstByte = in.readByte();
+    if (firstByte >= -32) {
+      return firstByte;
+    }
+
+    switch ((firstByte + 128) / 8) {
+      case 11:
+      case 10:
+      case 9:
+      case 8:
+      case 7:
+        return ((firstByte + 52) << 8) | in.readUnsignedByte();
+      case 6:
+      case 5:
+      case 4:
+      case 3:
+        return ((firstByte + 88) << 16) | in.readUnsignedShort();
+      case 2:
+      case 1:
+        return ((firstByte + 112) << 24) | (in.readUnsignedShort() << 8)
+            | in.readUnsignedByte();
+      case 0:
+        int len = firstByte + 129;
+        switch (len) {
+          case 4:
+            return in.readInt();
+          case 5:
+            return ((long) in.readInt()) << 8 | in.readUnsignedByte();
+          case 6:
+            return ((long) in.readInt()) << 16 | in.readUnsignedShort();
+          case 7:
+            return ((long) in.readInt()) << 24 | (in.readUnsignedShort() << 8)
+                | in.readUnsignedByte();
+          case 8:
+            return in.readLong();
+          default:
+            throw new IOException("Corrupted VLong encoding");
+        }
+      default:
+        throw new RuntimeException("Internal error");
+    }
+  }
+
+  /**
+   * Write a String as a VInt n, followed by n Bytes as in Text format.
+   * 
+   * @param out
+   * @param s
+   * @throws IOException
+   */
+  public static void writeString(DataOutput out, String s) throws IOException {
+    if (s != null) {
+      Text text = new Text(s);
+      byte[] buffer = text.getBytes();
+      int len = text.getLength();
+      writeVInt(out, len);
+      out.write(buffer, 0, len);
+    } else {
+      writeVInt(out, -1);
+    }
+  }
+
+  /**
+   * Read a String as a VInt n, followed by n Bytes in Text format.
+   * 
+   * @param in
+   *          The input stream.
+   * @return The string
+   * @throws IOException
+   */
+  public static String readString(DataInput in) throws IOException {
+    int length = readVInt(in);
+    if (length == -1) return null;
+    byte[] buffer = new byte[length];
+    in.readFully(buffer);
+    return Text.decode(buffer);
+  }
+
+  /**
+   * A generic Version class. We suggest applications built on top of TFile use
+   * this class to maintain version information in their meta blocks.
+   * 
+   * A version number consists of a major version and a minor version. The
+   * suggested usage of major and minor version number is to increment major
+   * version number when the new storage format is not backward compatible, and
+   * increment the minor version otherwise.
+   */
+  public static final class Version implements Comparable<Version> {
+    private final short major;
+    private final short minor;
+
+    /**
+     * Construct the Version object by reading from the input stream.
+     * 
+     * @param in
+     *          input stream
+     * @throws IOException
+     */
+    public Version(DataInput in) throws IOException {
+      major = in.readShort();
+      minor = in.readShort();
+    }
+
+    /**
+     * Constructor.
+     * 
+     * @param major
+     *          major version.
+     * @param minor
+     *          minor version.
+     */
+    public Version(short major, short minor) {
+      this.major = major;
+      this.minor = minor;
+    }
+
+    /**
+     * Write the objec to a DataOutput. The serialized format of the Version is
+     * major version followed by minor version, both as big-endian short
+     * integers.
+     * 
+     * @param out
+     *          The DataOutput object.
+     * @throws IOException
+     */
+    public void write(DataOutput out) throws IOException {
+      out.writeShort(major);
+      out.writeShort(minor);
+    }
+
+    /**
+     * Get the major version.
+     * 
+     * @return Major version.
+     */
+    public int getMajor() {
+      return major;
+    }
+
+    /**
+     * Get the minor version.
+     * 
+     * @return The minor version.
+     */
+    public int getMinor() {
+      return minor;
+    }
+
+    /**
+     * Get the size of the serialized Version object.
+     * 
+     * @return serialized size of the version object.
+     */
+    public static int size() {
+      return (Short.SIZE + Short.SIZE) / Byte.SIZE;
+    }
+
+    /**
+     * Return a string representation of the version.
+     */
+    public String toString() {
+      return new StringBuilder("v").append(major).append(".").append(minor)
+          .toString();
+    }
+
+    /**
+     * Test compatibility.
+     * 
+     * @param other
+     *          The Version object to test compatibility with.
+     * @return true if both versions have the same major version number; false
+     *         otherwise.
+     */
+    public boolean compatibleWith(Version other) {
+      return major == other.major;
+    }
+
+    /**
+     * Compare this version with another version.
+     */
+    @Override
+    public int compareTo(Version that) {
+      if (major != that.major) {
+        return major - that.major;
+      }
+      return minor - that.minor;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) return true;
+      if (!(other instanceof Version)) return false;
+      return compareTo((Version) other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return (major << 16 + minor);
+    }
+  }
+
+  /**
+   * Lower bound binary search. Find the index to the first element in the list
+   * that compares greater than or equal to key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @param cmp
+   *          Comparator for the key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int lowerBound(List<? extends T> list, T key,
+      Comparator<? super T> cmp) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      T midVal = list.get(mid);
+      int ret = cmp.compare(midVal, key);
+      if (ret < 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Upper bound binary search. Find the index to the first element in the list
+   * that compares greater than the input key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @param cmp
+   *          Comparator for the key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int upperBound(List<? extends T> list, T key,
+      Comparator<? super T> cmp) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      T midVal = list.get(mid);
+      int ret = cmp.compare(midVal, key);
+      if (ret <= 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Lower bound binary search. Find the index to the first element in the list
+   * that compares greater than or equal to key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int lowerBound(List<? extends Comparable<? super T>> list,
+      T key) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      Comparable<? super T> midVal = list.get(mid);
+      int ret = midVal.compareTo(key);
+      if (ret < 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Upper bound binary search. Find the index to the first element in the list
+   * that compares greater than the input key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int upperBound(List<? extends Comparable<? super T>> list,
+      T key) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      Comparable<? super T> midVal = list.get(mid);
+      int ret = midVal.compareTo(key);
+      if (ret <= 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KVGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KVGenerator.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KVGenerator.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KVGenerator.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.util.Random;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+
+/**
+ * Generate random <key, value> pairs.
+ */
+class KVGenerator {
+  private final Random random;
+  private final byte[][] dict;
+  private final boolean sorted;
+  private final DiscreteRNG keyLenRNG, valLenRNG;
+  private BytesWritable lastKey;
+  private static final int MIN_KEY_LEN = 4;
+  private final byte prefix[] = new byte[MIN_KEY_LEN];
+
+  public KVGenerator(Random random, boolean sorted, DiscreteRNG keyLenRNG,
+      DiscreteRNG valLenRNG, DiscreteRNG wordLenRNG, int dictSize) {
+    this.random = random;
+    dict = new byte[dictSize][];
+    this.sorted = sorted;
+    this.keyLenRNG = keyLenRNG;
+    this.valLenRNG = valLenRNG;
+    for (int i = 0; i < dictSize; ++i) {
+      int wordLen = wordLenRNG.nextInt();
+      dict[i] = new byte[wordLen];
+      random.nextBytes(dict[i]);
+    }
+    lastKey = new BytesWritable();
+    fillKey(lastKey);
+  }
+  
+  private void fillKey(BytesWritable o) {
+    int len = keyLenRNG.nextInt();
+    if (len < MIN_KEY_LEN) len = MIN_KEY_LEN;
+    o.setSize(len);
+    int n = MIN_KEY_LEN;
+    while (n < len) {
+      byte[] word = dict[random.nextInt(dict.length)];
+      int l = Math.min(word.length, len - n);
+      System.arraycopy(word, 0, o.get(), n, l);
+      n += l;
+    }
+    if (sorted
+        && WritableComparator.compareBytes(lastKey.get(), MIN_KEY_LEN, lastKey
+            .getSize()
+            - MIN_KEY_LEN, o.get(), MIN_KEY_LEN, o.getSize() - MIN_KEY_LEN) > 0) {
+      incrementPrefix();
+    }
+
+    System.arraycopy(prefix, 0, o.get(), 0, MIN_KEY_LEN);
+    lastKey.set(o);
+  }
+
+  private void fillValue(BytesWritable o) {
+    int len = valLenRNG.nextInt();
+    o.setSize(len);
+    int n = 0;
+    while (n < len) {
+      byte[] word = dict[random.nextInt(dict.length)];
+      int l = Math.min(word.length, len - n);
+      System.arraycopy(word, 0, o.get(), n, l);
+      n += l;
+    }
+  }
+  
+  private void incrementPrefix() {
+    for (int i = MIN_KEY_LEN - 1; i >= 0; --i) {
+      ++prefix[i];
+      if (prefix[i] != 0) return;
+    }
+    
+    throw new RuntimeException("Prefix overflown");
+  }
+  
+  public void next(BytesWritable key, BytesWritable value, boolean dupKey) {
+    if (dupKey) {
+      key.set(lastKey);
+    }
+    else {
+      fillKey(key);
+    }
+    fillValue(value);
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KeySampler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KeySampler.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KeySampler.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/KeySampler.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RandomDistribution.DiscreteRNG;
+
+class KeySampler {
+  Random random;
+  int min, max;
+  DiscreteRNG keyLenRNG;
+  private static final int MIN_KEY_LEN = 4;
+
+  public KeySampler(Random random, RawComparable first, RawComparable last,
+      DiscreteRNG keyLenRNG) throws IOException {
+    this.random = random;
+    min = keyPrefixToInt(first);
+    max = keyPrefixToInt(last);
+    this.keyLenRNG = keyLenRNG;
+  }
+
+  private int keyPrefixToInt(RawComparable key) throws IOException {
+    byte[] b = key.buffer();
+    int o = key.offset();
+    return (b[o] & 0xff) << 24 | (b[o + 1] & 0xff) << 16
+        | (b[o + 2] & 0xff) << 8 | (b[o + 3] & 0xff);
+  }
+  
+  public void next(BytesWritable key) {
+    key.setSize(Math.max(MIN_KEY_LEN, keyLenRNG.nextInt()));
+    random.nextBytes(key.get());
+    int n = random.nextInt(max - min) + min;
+    byte[] b = key.get();
+    b[0] = (byte) (n >> 24);
+    b[1] = (byte) (n >> 16);
+    b[2] = (byte) (n >> 8);
+    b[3] = (byte) n;
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/NanoTimer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/NanoTimer.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/NanoTimer.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/NanoTimer.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+/**
+ * A nano-second timer.
+ */
+public class NanoTimer {
+  private long last = -1;
+  private boolean started = false;
+  private long cumulate = 0;
+
+  /**
+   * Constructor
+   * 
+   * @param start
+   *          Start the timer upon construction.
+   */
+  public NanoTimer(boolean start) {
+    if (start) this.start();
+  }
+
+  /**
+   * Start the timer.
+   * 
+   * Note: No effect if timer is already started.
+   */
+  public void start() {
+    if (!this.started) {
+      this.last = System.nanoTime();
+      this.started = true;
+    }
+  }
+
+  /**
+   * Stop the timer.
+   * 
+   * Note: No effect if timer is already stopped.
+   */
+  public void stop() {
+    if (this.started) {
+      this.started = false;
+      this.cumulate += System.nanoTime() - this.last;
+    }
+  }
+
+  /**
+   * Read the timer.
+   * 
+   * @return the elapsed time in nano-seconds. Note: If the timer is never
+   *         started before, -1 is returned.
+   */
+  public long read() {
+    if (!readable()) return -1;
+
+    return this.cumulate;
+  }
+
+  /**
+   * Reset the timer.
+   */
+  public void reset() {
+    this.last = -1;
+    this.started = false;
+    this.cumulate = 0;
+  }
+
+  /**
+   * Checking whether the timer is started
+   * 
+   * @return true if timer is started.
+   */
+  public boolean isStarted() {
+    return this.started;
+  }
+
+  /**
+   * Format the elapsed time to a human understandable string.
+   * 
+   * Note: If timer is never started, "ERR" will be returned.
+   */
+  public String toString() {
+    if (!readable()) {
+      return "ERR";
+    }
+
+    return NanoTimer.nanoTimeToString(this.cumulate);
+  }
+
+  /**
+   * A utility method to format a time duration in nano seconds into a human
+   * understandable stirng.
+   * 
+   * @param t
+   *          Time duration in nano seconds.
+   * @return String representation.
+   */
+  public static String nanoTimeToString(long t) {
+    if (t < 0) return "ERR";
+
+    if (t == 0) return "0";
+
+    if (t < 1000) {
+      return t + "ns";
+    }
+
+    double us = (double) t / 1000;
+    if (us < 1000) {
+      return String.format("%.2fus", us);
+    }
+
+    double ms = us / 1000;
+    if (ms < 1000) {
+      return String.format("%.2fms", ms);
+    }
+
+    double ss = ms / 1000;
+    if (ss < 1000) {
+      return String.format("%.2fs", ss);
+    }
+
+    long mm = (long) ss / 60;
+    ss -= mm * 60;
+    long hh = mm / 60;
+    mm -= hh * 60;
+    long dd = hh / 24;
+    hh -= dd * 24;
+
+    if (dd > 0) {
+      return String.format("%dd%dh", dd, hh);
+    }
+
+    if (hh > 0) {
+      return String.format("%dh%dm", hh, mm);
+    }
+
+    if (mm > 0) {
+      return String.format("%dm%.1fs", mm, ss);
+    }
+
+    return String.format("%.2fs", ss);
+
+    /**
+     * StringBuilder sb = new StringBuilder(); String sep = "";
+     * 
+     * if (dd > 0) { String unit = (dd > 1) ? "days" : "day";
+     * sb.append(String.format("%s%d%s", sep, dd, unit)); sep = " "; }
+     * 
+     * if (hh > 0) { String unit = (hh > 1) ? "hrs" : "hr";
+     * sb.append(String.format("%s%d%s", sep, hh, unit)); sep = " "; }
+     * 
+     * if (mm > 0) { String unit = (mm > 1) ? "mins" : "min";
+     * sb.append(String.format("%s%d%s", sep, mm, unit)); sep = " "; }
+     * 
+     * if (ss > 0) { String unit = (ss > 1) ? "secs" : "sec";
+     * sb.append(String.format("%s%.3f%s", sep, ss, unit)); sep = " "; }
+     * 
+     * return sb.toString();
+     */
+  }
+
+  private boolean readable() {
+    return this.last != -1;
+  }
+
+  /**
+   * Simple tester.
+   * 
+   * @param args
+   */
+  public static void main(String[] args) {
+    long i = 7;
+
+    for (int x = 0; x < 20; ++x, i *= 7) {
+      System.out.println(NanoTimer.nanoTimeToString(i));
+    }
+  }
+}
+

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/RandomDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/RandomDistribution.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/RandomDistribution.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/RandomDistribution.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.io.file.tfile;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Random;
+
+/**
+ * A class that generates random numbers that follow some distribution.
+ */
+public class RandomDistribution {
+  /**
+   * Interface for discrete (integer) random distributions.
+   */
+  public static interface DiscreteRNG {
+    /**
+     * Get the next random number
+     * 
+     * @return the next random number.
+     */
+    public int nextInt();
+  }
+
+  /**
+   * P(i)=1/(max-min)
+   */
+  public static final class Flat implements DiscreteRNG {
+    private final Random random;
+    private final int min;
+    private final int max;
+
+    /**
+     * Generate random integers from min (inclusive) to max (exclusive)
+     * following even distribution.
+     * 
+     * @param random
+     *          The basic random number generator.
+     * @param min
+     *          Minimum integer
+     * @param max
+     *          maximum integer (exclusive).
+     * 
+     */
+    public Flat(Random random, int min, int max) {
+      if (min >= max) {
+        throw new IllegalArgumentException("Invalid range");
+      }
+      this.random = random;
+      this.min = min;
+      this.max = max;
+    }
+    
+    /**
+     * @see DiscreteRNG#nextInt()
+     */
+    @Override
+    public int nextInt() {
+      return random.nextInt(max - min) + min;
+    }
+  }
+
+  /**
+   * Zipf distribution. The ratio of the probabilities of integer i and j is
+   * defined as follows:
+   * 
+   * P(i)/P(j)=((j-min+1)/(i-min+1))^sigma.
+   */
+  public static final class Zipf implements DiscreteRNG {
+    private static final double DEFAULT_EPSILON = 0.001;
+    private final Random random;
+    private final ArrayList<Integer> k;
+    private final ArrayList<Double> v;
+
+    /**
+     * Constructor
+     * 
+     * @param r
+     *          The random number generator.
+     * @param min
+     *          minimum integer (inclusvie)
+     * @param max
+     *          maximum integer (exclusive)
+     * @param sigma
+     *          parameter sigma. (sigma > 1.0)
+     */
+    public Zipf(Random r, int min, int max, double sigma) {
+      this(r, min, max, sigma, DEFAULT_EPSILON);
+    }
+
+    /**
+     * Constructor.
+     * 
+     * @param r
+     *          The random number generator.
+     * @param min
+     *          minimum integer (inclusvie)
+     * @param max
+     *          maximum integer (exclusive)
+     * @param sigma
+     *          parameter sigma. (sigma > 1.0)
+     * @param epsilon
+     *          Allowable error percentage (0 < epsilon < 1.0).
+     */
+    public Zipf(Random r, int min, int max, double sigma, double epsilon) {
+      if ((max <= min) || (sigma <= 1) || (epsilon <= 0)
+          || (epsilon >= 0.5)) {
+        throw new IllegalArgumentException("Invalid arguments");
+      }
+      random = r;
+      k = new ArrayList<Integer>();
+      v = new ArrayList<Double>();
+
+      double sum = 0;
+      int last = -1;
+      for (int i = min; i < max; ++i) {
+        sum += Math.exp(-sigma * Math.log(i - min + 1));
+        if ((last == -1) || i * (1 - epsilon) > last) {
+          k.add(i);
+          v.add(sum);
+          last = i;
+        }
+      }
+
+      if (last != max - 1) {
+        k.add(max - 1);
+        v.add(sum);
+      }
+
+      v.set(v.size() - 1, 1.0);
+
+      for (int i = v.size() - 2; i >= 0; --i) {
+        v.set(i, v.get(i) / sum);
+      }
+    }
+
+    /**
+     * @see DiscreteRNG#nextInt()
+     */
+    @Override
+    public int nextInt() {
+      double d = random.nextDouble();
+      int idx = Collections.binarySearch(v, d);
+
+      if (idx > 0) {
+        ++idx;
+      }
+      else {
+        idx = -(idx + 1);
+      }
+
+      if (idx >= v.size()) {
+        idx = v.size() - 1;
+      }
+
+      if (idx == 0) {
+        return k.get(0);
+      }
+
+      int ceiling = k.get(idx);
+      int lower = k.get(idx - 1);
+
+      return ceiling - random.nextInt(ceiling - lower);
+    }
+  }
+
+  /**
+   * Binomial distribution.
+   * 
+   * P(k)=select(n, k)*p^k*(1-p)^(n-k) (k = 0, 1, ..., n)
+   * 
+   * P(k)=select(max-min-1, k-min)*p^(k-min)*(1-p)^(k-min)*(1-p)^(max-k-1)
+   */
+  public static final class Binomial implements DiscreteRNG {
+    private final Random random;
+    private final int min;
+    private final int n;
+    private final double[] v;
+
+    private static double select(int n, int k) {
+      double ret = 1.0;
+      for (int i = k + 1; i <= n; ++i) {
+        ret *= (double) i / (i - k);
+      }
+      return ret;
+    }
+    
+    private static double power(double p, int k) {
+      return Math.exp(k * Math.log(p));
+    }
+
+    /**
+     * Generate random integers from min (inclusive) to max (exclusive)
+     * following Binomial distribution.
+     * 
+     * @param random
+     *          The basic random number generator.
+     * @param min
+     *          Minimum integer
+     * @param max
+     *          maximum integer (exclusive).
+     * @param p
+     *          parameter.
+     * 
+     */
+    public Binomial(Random random, int min, int max, double p) {
+      if (min >= max) {
+        throw new IllegalArgumentException("Invalid range");
+      }
+      this.random = random;
+      this.min = min;
+      this.n = max - min - 1;
+      if (n > 0) {
+        v = new double[n + 1];
+        double sum = 0.0;
+        for (int i = 0; i <= n; ++i) {
+          sum += select(n, i) * power(p, i) * power(1 - p, n - i);
+          v[i] = sum;
+        }
+        for (int i = 0; i <= n; ++i) {
+          v[i] /= sum;
+        }
+      }
+      else {
+        v = null;
+      }
+    }
+
+    /**
+     * @see DiscreteRNG#nextInt()
+     */
+    @Override
+    public int nextInt() {
+      if (v == null) {
+        return min;
+      }
+      double d = random.nextDouble();
+      int idx = Arrays.binarySearch(v, d);
+      if (idx > 0) {
+        ++idx;
+      } else {
+        idx = -(idx + 1);
+      }
+
+      if (idx >= v.length) {
+        idx = v.length - 1;
+      }
+      return idx + min;
+    }
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+
+/**
+ * test tfile features.
+ * 
+ */
+public class TestTFile extends TestCase {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+  private FileSystem fs;
+  private Configuration conf;
+  private final int minBlockSize = 512;
+  private final int largeVal = 3 * 1024 * 1024;
+  private static String localFormatter = "%010d";
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    fs = FileSystem.get(conf);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    // do nothing
+  }
+
+  // read a key from the scanner
+  public byte[] readKey(Scanner scanner) throws IOException {
+    int keylen = scanner.entry().getKeyLength();
+    byte[] read = new byte[keylen];
+    scanner.entry().getKey(read);
+    return read;
+  }
+
+  // read a value from the scanner
+  public byte[] readValue(Scanner scanner) throws IOException {
+    int valueLen = scanner.entry().getValueLength();
+    byte[] read = new byte[valueLen];
+    scanner.entry().getValue(read);
+    return read;
+  }
+
+  // read a long value from the scanner
+  public byte[] readLongValue(Scanner scanner, int len) throws IOException {
+    DataInputStream din = scanner.entry().getValueStream();
+    byte[] b = new byte[len];
+    din.readFully(b);
+    din.close();
+    return b;
+  }
+
+  // write some records into the tfile
+  // write them twice
+  private int writeSomeRecords(Writer writer, int start, int n)
+      throws IOException {
+    String value = "value";
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      writer.append(key.getBytes(), (value + key).getBytes());
+      writer.append(key.getBytes(), (value + key).getBytes());
+    }
+    return (start + n);
+  }
+
+  // read the records and check
+  private int readAndCheckbytes(Scanner scanner, int start, int n)
+      throws IOException {
+    String value = "value";
+    for (int i = start; i < (start + n); i++) {
+      byte[] key = readKey(scanner);
+      byte[] val = readValue(scanner);
+      String keyStr = String.format(localFormatter, i);
+      String valStr = value + keyStr;
+      assertTrue("btyes for keys do not match " + keyStr + " "
+          + new String(key), Arrays.equals(keyStr.getBytes(), key));
+      assertTrue("bytes for vals do not match " + valStr + " "
+          + new String(val), Arrays.equals(
+          valStr.getBytes(), val));
+      assertTrue(scanner.advance());
+      key = readKey(scanner);
+      val = readValue(scanner);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      assertTrue("bytes for vals do not match", Arrays.equals(
+          valStr.getBytes(), val));
+      assertTrue(scanner.advance());
+    }
+    return (start + n);
+  }
+
+  // write some large records
+  // write them twice
+  private int writeLargeRecords(Writer writer, int start, int n)
+      throws IOException {
+    byte[] value = new byte[largeVal];
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      writer.append(key.getBytes(), value);
+      writer.append(key.getBytes(), value);
+    }
+    return (start + n);
+  }
+
+  // read large records
+  // read them twice since its duplicated
+  private int readLargeRecords(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      byte[] key = readKey(scanner);
+      String keyStr = String.format(localFormatter, i);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      scanner.advance();
+      key = readKey(scanner);
+      assertTrue("btyes for keys do not match", Arrays.equals(
+          keyStr.getBytes(), key));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  // write empty keys and values
+  private void writeEmptyRecords(Writer writer, int n) throws IOException {
+    byte[] key = new byte[0];
+    byte[] value = new byte[0];
+    for (int i = 0; i < n; i++) {
+      writer.append(key, value);
+    }
+  }
+
+  // read empty keys and values
+  private void readEmptyRecords(Scanner scanner, int n) throws IOException {
+    byte[] key = new byte[0];
+    byte[] value = new byte[0];
+    byte[] readKey = null;
+    byte[] readValue = null;
+    for (int i = 0; i < n; i++) {
+      readKey = readKey(scanner);
+      readValue = readValue(scanner);
+      assertTrue("failed to match keys", Arrays.equals(readKey, key));
+      assertTrue("failed to match values", Arrays.equals(readValue, value));
+      assertTrue("failed to advance cursor", scanner.advance());
+    }
+  }
+
+  private int writePrepWithKnownLength(Writer writer, int start, int n)
+      throws IOException {
+    // get the length of the key
+    String key = String.format(localFormatter, start);
+    int keyLen = key.getBytes().length;
+    String value = "value" + key;
+    int valueLen = value.getBytes().length;
+    for (int i = start; i < (start + n); i++) {
+      DataOutputStream out = writer.prepareAppendKey(keyLen);
+      String localKey = String.format(localFormatter, i);
+      out.write(localKey.getBytes());
+      out.close();
+      out = writer.prepareAppendValue(valueLen);
+      String localValue = "value" + localKey;
+      out.write(localValue.getBytes());
+      out.close();
+    }
+    return (start + n);
+  }
+
+  private int readPrepWithKnownLength(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      String key = String.format(localFormatter, i);
+      byte[] read = readKey(scanner);
+      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+      String value = "value" + key;
+      read = readValue(scanner);
+      assertTrue("values not equal", Arrays.equals(value.getBytes(), read));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  private int writePrepWithUnkownLength(Writer writer, int start, int n)
+      throws IOException {
+    for (int i = start; i < (start + n); i++) {
+      DataOutputStream out = writer.prepareAppendKey(-1);
+      String localKey = String.format(localFormatter, i);
+      out.write(localKey.getBytes());
+      out.close();
+      String value = "value" + localKey;
+      out = writer.prepareAppendValue(-1);
+      out.write(value.getBytes());
+      out.close();
+    }
+    return (start + n);
+  }
+
+  private int readPrepWithUnknownLength(Scanner scanner, int start, int n)
+      throws IOException {
+    for (int i = start; i < start; i++) {
+      String key = String.format(localFormatter, i);
+      byte[] read = readKey(scanner);
+      assertTrue("keys not equal", Arrays.equals(key.getBytes(), read));
+      try {
+        read = readValue(scanner);
+        assertTrue(false);
+      }
+      catch (IOException ie) {
+        // should have thrown exception
+      }
+      String value = "value" + key;
+      read = readLongValue(scanner, value.getBytes().length);
+      assertTrue("values nto equal", Arrays.equals(read, value.getBytes()));
+      scanner.advance();
+    }
+    return (start + n);
+  }
+
+  private byte[] getSomeKey(int rowId) {
+    return String.format(localFormatter, rowId).getBytes();
+  }
+
+  private void writeRecords(Writer writer) throws IOException {
+    writeEmptyRecords(writer, 10);
+    int ret = writeSomeRecords(writer, 0, 100);
+    ret = writeLargeRecords(writer, ret, 1);
+    ret = writePrepWithKnownLength(writer, ret, 40);
+    ret = writePrepWithUnkownLength(writer, ret, 50);
+    writer.close();
+  }
+
+  private void readAllRecords(Scanner scanner) throws IOException {
+    readEmptyRecords(scanner, 10);
+    int ret = readAndCheckbytes(scanner, 0, 100);
+    ret = readLargeRecords(scanner, ret, 1);
+    ret = readPrepWithKnownLength(scanner, ret, 40);
+    ret = readPrepWithUnknownLength(scanner, ret, 50);
+  }
+
+  private FSDataOutputStream createFSOutput(Path name) throws IOException {
+    if (fs.exists(name)) fs.delete(name, true);
+    FSDataOutputStream fout = fs.create(name);
+    return fout;
+  }
+
+  /**
+   * test none codecs
+   */
+  void basicWithSomeCodec(String codec) throws IOException {
+    Path ncTFile = new Path(ROOT, "basic.tfile");
+    FSDataOutputStream fout = createFSOutput(ncTFile);
+    Writer writer = new Writer(fout, minBlockSize, codec, "memcmp", conf);
+    writeRecords(writer);
+    fout.close();
+    FSDataInputStream fin = fs.open(ncTFile);
+    Reader reader =
+        new Reader(fs.open(ncTFile), fs.getFileStatus(ncTFile).getLen(), conf);
+
+    Scanner scanner = reader.createScanner();
+    readAllRecords(scanner);
+    scanner.seekTo(getSomeKey(50));
+    assertTrue("location lookup failed", scanner.seekTo(getSomeKey(50)));
+    // read the key and see if it matches
+    byte[] readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(getSomeKey(50),
+        readKey));
+
+    scanner.seekTo(new byte[0]);
+    byte[] val1 = readValue(scanner);
+    scanner.seekTo(new byte[0]);
+    byte[] val2 = readValue(scanner);
+    assertTrue(Arrays.equals(val1, val2));
+    
+    // check for lowerBound
+    scanner.lowerBound(getSomeKey(50));
+    assertTrue("locaton lookup failed", scanner.currentLocation
+        .compareTo(reader.end()) < 0);
+    readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(readKey,
+        getSomeKey(50)));
+
+    // check for upper bound
+    scanner.upperBound(getSomeKey(50));
+    assertTrue("location lookup failed", scanner.currentLocation
+        .compareTo(reader.end()) < 0);
+    readKey = readKey(scanner);
+    assertTrue("seeked key does not match", Arrays.equals(readKey,
+        getSomeKey(51)));
+
+    scanner.close();
+    // test for a range of scanner
+    scanner = reader.createScanner(getSomeKey(10), getSomeKey(60));
+    readAndCheckbytes(scanner, 10, 50);
+    assertFalse(scanner.advance());
+    scanner.close();
+    reader.close();
+    fin.close();
+    fs.delete(ncTFile, true);
+  }
+
+  // unsorted with some codec
+  void unsortedWithSomeCodec(String codec) throws IOException {
+    Path uTfile = new Path(ROOT, "unsorted.tfile");
+    FSDataOutputStream fout = createFSOutput(uTfile);
+    Writer writer = new Writer(fout, minBlockSize, codec, null, conf);
+    writeRecords(writer);
+    writer.close();
+    fout.close();
+    FSDataInputStream fin = fs.open(uTfile);
+    Reader reader =
+        new Reader(fs.open(uTfile), fs.getFileStatus(uTfile).getLen(), conf);
+
+    Scanner scanner = reader.createScanner();
+    readAllRecords(scanner);
+    scanner.close();
+    reader.close();
+    fin.close();
+    fs.delete(uTfile, true);
+  }
+
+  public void testTFileFeatures() throws IOException {
+    basicWithSomeCodec("none");
+    basicWithSomeCodec("gz");
+  }
+
+  // test unsorted t files.
+  public void testUnsortedTFileFeatures() throws IOException {
+    unsortedWithSomeCodec("none");
+    unsortedWithSomeCodec("gz");
+  }
+
+  private void writeNumMetablocks(Writer writer, String compression, int n)
+      throws IOException {
+    for (int i = 0; i < n; i++) {
+      DataOutputStream dout =
+          writer.prepareMetaBlock("TfileMeta" + i, compression);
+      byte[] b = ("something to test" + i).getBytes();
+      dout.write(b);
+      dout.close();
+    }
+  }
+
+  private void someTestingWithMetaBlock(Writer writer, String compression)
+      throws IOException {
+    DataOutputStream dout = null;
+    writeNumMetablocks(writer, compression, 10);
+    try {
+      dout = writer.prepareMetaBlock("TfileMeta1", compression);
+      assertTrue(false);
+    }
+    catch (MetaBlockAlreadyExists me) {
+      // avoid this exception
+    }
+    dout = writer.prepareMetaBlock("TFileMeta100", compression);
+    dout.close();
+  }
+
+  private void readNumMetablocks(Reader reader, int n) throws IOException {
+    int len = ("something to test" + 0).getBytes().length;
+    for (int i = 0; i < n; i++) {
+      DataInputStream din = reader.getMetaBlock("TfileMeta" + i);
+      byte b[] = new byte[len];
+      din.readFully(b);
+      assertTrue("faield to match metadata", Arrays.equals(
+          ("something to test" + i).getBytes(), b));
+      din.close();
+    }
+  }
+
+  private void someReadingWithMetaBlock(Reader reader) throws IOException {
+    DataInputStream din = null;
+    readNumMetablocks(reader, 10);
+    try {
+      din = reader.getMetaBlock("NO ONE");
+      assertTrue(false);
+    }
+    catch (MetaBlockDoesNotExist me) {
+      // should catch
+    }
+    din = reader.getMetaBlock("TFileMeta100");
+    int read = din.read();
+    assertTrue("check for status", (read == -1));
+    din.close();
+  }
+
+  // test meta blocks for tfiles
+  public void testMetaBlocks() throws IOException {
+    Path mFile = new Path(ROOT, "meta.tfile");
+    FSDataOutputStream fout = createFSOutput(mFile);
+    Writer writer = new Writer(fout, minBlockSize, "none", null, conf);
+    someTestingWithMetaBlock(writer, "none");
+    writer.close();
+    fout.close();
+    FSDataInputStream fin = fs.open(mFile);
+    Reader reader = new Reader(fin, fs.getFileStatus(mFile).getLen(), conf);
+    someReadingWithMetaBlock(reader);
+    fs.delete(mFile, true);
+    reader.close();
+    fin.close();
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,790 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.file.tfile.TFile.Reader;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Location;
+import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+public class TestTFileByteArrays extends TestCase {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+  private final static int BLOCK_SIZE = 512;
+  private final static int BUF_SIZE = 64;
+  private final static int K = 1024;
+  protected boolean skip = false;
+
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+
+  private FileSystem fs;
+  private Configuration conf;
+  private Path path;
+  private FSDataOutputStream out;
+  private Writer writer;
+
+  private String compression = Compression.Algorithm.GZ.getName();
+  private String comparator = "memcmp";
+  private String outputFile = "TFileTestByteArrays";
+  /*
+   * pre-sampled numbers of records in one block, based on the given the
+   * generated key and value strings
+   */
+  // private int records1stBlock = 4314;
+  // private int records2ndBlock = 4108;
+  private int records1stBlock = 4480;
+  private int records2ndBlock = 4263;
+
+  public void init(String compression, String comparator, String outputFile,
+      int numRecords1stBlock, int numRecords2ndBlock) {
+    this.compression = compression;
+    this.comparator = comparator;
+    this.outputFile = outputFile;
+    this.records1stBlock = numRecords1stBlock;
+    this.records2ndBlock = numRecords2ndBlock;
+  }
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    path = new Path(ROOT, outputFile);
+    fs = path.getFileSystem(conf);
+    out = fs.create(path);
+    writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    if (!skip)
+    fs.delete(path, true);
+  }
+
+  public void testNoDataEntry() throws IOException {
+    if (skip) 
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Assert.assertTrue(reader.isSorted());
+    Scanner scanner = reader.createScanner();
+    Assert.assertTrue(scanner.atEnd());
+    scanner.close();
+    reader.close();
+  }
+
+  public void testOneDataEntry() throws IOException {
+    if (skip)
+      return;
+    writeRecords(1);
+    readRecords(1);
+
+    checkBlockIndex(1, 0, 0);
+    readValueBeforeKey(1, 0);
+    readKeyWithoutValue(1, 0);
+    readValueWithoutKey(1, 0);
+    readKeyManyTimes(1, 0);
+  }
+
+  public void testTwoDataEntries() throws IOException {
+    if (skip)
+      return;
+    writeRecords(2);
+    readRecords(2);
+  }
+
+  /**
+   * Fill up exactly one block.
+   * 
+   * @throws IOException
+   */
+  public void testOneBlock() throws IOException {
+    if (skip)
+      return;
+    // just under one block
+    writeRecords(records1stBlock);
+    readRecords(records1stBlock);
+    // last key should be in the first block (block 0)
+    checkBlockIndex(records1stBlock, records1stBlock - 1, 0);
+  }
+
+  /**
+   * One block plus one record.
+   * 
+   * @throws IOException
+   */
+  public void testOneBlockPlusOneEntry() throws IOException {
+    if (skip)
+      return;
+    writeRecords(records1stBlock + 1);
+    readRecords(records1stBlock + 1);
+    checkBlockIndex(records1stBlock + 1, records1stBlock - 1, 0);
+    checkBlockIndex(records1stBlock + 1, records1stBlock, 1);
+  }
+
+  public void testTwoBlocks() throws IOException {
+    if (skip)
+      return;
+    writeRecords(records1stBlock + 5);
+    readRecords(records1stBlock + 5);
+    checkBlockIndex(records1stBlock + 5, records1stBlock + 4, 1);
+  }
+
+  public void testThreeBlocks() throws IOException {
+    if (skip) 
+      return;
+    writeRecords(2 * records1stBlock + 5);
+    readRecords(2 * records1stBlock + 5);
+
+    checkBlockIndex(2 * records1stBlock + 5, 2 * records1stBlock + 4, 2);
+    // 1st key in file
+    readValueBeforeKey(2 * records1stBlock + 5, 0);
+    readKeyWithoutValue(2 * records1stBlock + 5, 0);
+    readValueWithoutKey(2 * records1stBlock + 5, 0);
+    readKeyManyTimes(2 * records1stBlock + 5, 0);
+    // last key in file
+    readValueBeforeKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+    readKeyWithoutValue(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+    readValueWithoutKey(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+    readKeyManyTimes(2 * records1stBlock + 5, 2 * records1stBlock + 4);
+
+    // 1st key in mid block, verify block indexes then read
+    checkBlockIndex(2 * records1stBlock + 5, records1stBlock - 1, 0);
+    checkBlockIndex(2 * records1stBlock + 5, records1stBlock, 1);
+    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock);
+    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock);
+    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock);
+    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock);
+
+    // last key in mid block, verify block indexes then read
+    checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock
+        - 1, 1);
+    checkBlockIndex(2 * records1stBlock + 5, records1stBlock + records2ndBlock,
+        2);
+    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock
+        + records2ndBlock - 1);
+    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock
+        + records2ndBlock - 1);
+    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock
+        + records2ndBlock - 1);
+    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + records2ndBlock
+        - 1);
+
+    // mid in mid block
+    readValueBeforeKey(2 * records1stBlock + 5, records1stBlock + 10);
+    readKeyWithoutValue(2 * records1stBlock + 5, records1stBlock + 10);
+    readValueWithoutKey(2 * records1stBlock + 5, records1stBlock + 10);
+    readKeyManyTimes(2 * records1stBlock + 5, records1stBlock + 10);
+  }
+
+  Location locate(Scanner scanner, byte[] key) throws IOException {
+    if (scanner.seekTo(key) == true) {
+      return scanner.currentLocation;
+    }
+    return scanner.endLocation;
+  }
+  
+  public void testLocate() throws IOException {
+    if (skip)
+      return;
+    writeRecords(3 * records1stBlock);
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    Location loc2 =
+        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock, 2)
+            .getBytes());
+    Location locLastIn1stBlock =
+        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
+            records1stBlock - 1).getBytes());
+    Location locFirstIn2ndBlock =
+        locate(scanner, composeSortedKey(KEY, 3 * records1stBlock,
+            records1stBlock).getBytes());
+    Location locX = locate(scanner, "keyX".getBytes());
+    Assert.assertEquals(scanner.endLocation, locX);
+    scanner.close();
+    reader.close();
+  }
+
+  public void testFailureWriterNotClosed() throws IOException {
+    if (skip)
+      return;
+    Reader reader = null;
+    try {
+      reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Cannot read before closing the writer.");
+    }
+    catch (IOException e) {
+      // noop, expecting exceptions
+    }
+    finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  public void testFailureWriteMetaBlocksWithSameName() throws IOException {
+    if (skip)
+      return;
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("foo".getBytes());
+    outMeta.close();
+    // add the same metablock
+    try {
+      DataOutputStream outMeta2 =
+          writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+      Assert.fail("Cannot create metablocks with the same name.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  public void testFailureGetNonExistentMetaBlock() throws IOException {
+    if (skip)
+      return;
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("foo".getBytes());
+    outMeta.close();
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    DataInputStream mb = reader.getMetaBlock("testX");
+    Assert.assertNotNull(mb);
+    mb.close();
+    try {
+      DataInputStream mbBad = reader.getMetaBlock("testY");
+      Assert.assertNull(mbBad);
+      Assert.fail("Error on handling non-existent metablocks.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    reader.close();
+  }
+
+  public void testFailureWriteRecordAfterMetaBlock() throws IOException {
+    if (skip)
+      return;
+    // write a key/value first
+    writer.append("keyX".getBytes(), "valueX".getBytes());
+    // create a new metablock
+    DataOutputStream outMeta =
+        writer.prepareMetaBlock("testX", Compression.Algorithm.GZ.getName());
+    outMeta.write(123);
+    outMeta.write("dummy".getBytes());
+    outMeta.close();
+    // add more key/value
+    try {
+      writer.append("keyY".getBytes(), "valueY".getBytes());
+      Assert.fail("Cannot add key/value after start adding meta blocks.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  public void testFailureReadValueManyTimes() throws IOException {
+    if (skip)
+      return;
+    writeRecords(5);
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+
+    byte[] vbuf = new byte[BUF_SIZE];
+    int vlen = scanner.entry().getValueLength();
+    scanner.entry().getValue(vbuf);
+    Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + 0);
+    try {
+      scanner.entry().getValue(vbuf);
+      Assert.fail("Cannot get the value mlutiple times.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+
+    scanner.close();
+    reader.close();
+  }
+
+  public void testFailureBadCompressionCodec() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    out = fs.create(path);
+    try {
+      writer = new Writer(out, BLOCK_SIZE, "BAD", comparator, conf);
+      Assert.fail("Error on handling invalid compression codecs.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      // e.printStackTrace();
+    }
+  }
+
+  public void testFailureOpenEmptyFile() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    // create an absolutely empty file
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    out = fs.create(path);
+    out.close();
+    try {
+      Reader reader =
+          new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Error on handling empty files.");
+    }
+    catch (EOFException e) {
+      // noop, expecting exceptions
+    }
+  }
+
+  public void testFailureOpenRandomFile() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    // create an random file
+    path = new Path(fs.getWorkingDirectory(), outputFile);
+    out = fs.create(path);
+    Random rand = new Random();
+    byte[] buf = new byte[K];
+    // fill with > 1MB data
+    for (int nx = 0; nx < K + 2; nx++) {
+      rand.nextBytes(buf);
+      out.write(buf);
+    }
+    out.close();
+    try {
+      Reader reader =
+          new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+      Assert.fail("Error on handling random files.");
+    }
+    catch (IOException e) {
+      // noop, expecting exceptions
+    }
+  }
+
+  public void testFailureKeyLongerThan64K() throws IOException {
+    if (skip)
+      return;
+    byte[] buf = new byte[64 * K + 1];
+    Random rand = new Random();
+    rand.nextBytes(buf);
+    try {
+      writer.append(buf, "valueX".getBytes());
+    }
+    catch (IndexOutOfBoundsException e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  public void testFailureOutOfOrderKeys() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyM".getBytes(), "valueM".getBytes());
+      writer.append("keyA".getBytes(), "valueA".getBytes());
+      Assert.fail("Error on handling out of order keys.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      // e.printStackTrace();
+    }
+
+    closeOutput();
+  }
+
+  public void testFailureNegativeOffset() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyX".getBytes(), -1, 4, "valueX".getBytes(), 0, 6);
+      Assert.fail("Error on handling negative offset.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  public void testFailureNegativeOffset_2() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      scanner.lowerBound("keyX".getBytes(), -1, 4);
+      Assert.fail("Error on handling negative offset.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    finally {
+      reader.close();
+      scanner.close();
+    }
+    closeOutput();
+  }
+
+  public void testFailureNegativeLength() throws IOException {
+    if (skip)
+      return;
+    try {
+      writer.append("keyX".getBytes(), 0, -1, "valueX".getBytes(), 0, 6);
+      Assert.fail("Error on handling negative length.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  public void testFailureNegativeLength_2() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      scanner.lowerBound("keyX".getBytes(), 0, -1);
+      Assert.fail("Error on handling negative length.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    finally {
+      scanner.close();
+      reader.close();
+    }
+    closeOutput();
+  }
+
+  public void testFailureNegativeLength_3() throws IOException {
+    if (skip)
+      return;
+    writeRecords(3);
+
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    try {
+      // test negative array offset
+      try {
+        scanner.seekTo("keyY".getBytes(), -1, 4);
+        Assert.fail("Failed to handle negative offset.");
+      } catch (Exception e) {
+        // noop, expecting exceptions
+      }
+
+      // test negative array length
+      try {
+        scanner.seekTo("keyY".getBytes(), 0, -2);
+        Assert.fail("Failed to handle negative key length.");
+      } catch (Exception e) {
+        // noop, expecting exceptions
+      }
+    } finally {
+      reader.close();
+      scanner.close();
+    }
+  }
+
+  public void testFailureCompressionNotWorking() throws IOException {
+    if (skip)
+      return;
+    long rawDataSize = writeRecords(10 * records1stBlock, false);
+    if (!compression.equalsIgnoreCase(Compression.Algorithm.NONE.getName())) {
+      Assert.assertTrue(out.getPos() < rawDataSize);
+    }
+    closeOutput();
+  }
+
+  public void testFailureFileWriteNotAt0Position() throws IOException {
+    if (skip)
+      return;
+    closeOutput();
+    out = fs.create(path);
+    out.write(123);
+
+    try {
+      writer = new Writer(out, BLOCK_SIZE, compression, comparator, conf);
+      Assert.fail("Failed to catch file write not at position 0.");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+    }
+    closeOutput();
+  }
+
+  private long writeRecords(int count) throws IOException {
+    return writeRecords(count, true);
+  }
+
+  private long writeRecords(int count, boolean close) throws IOException {
+    long rawDataSize = writeRecords(writer, count);
+    if (close) {
+      closeOutput();
+    }
+    return rawDataSize;
+  }
+
+  static long writeRecords(Writer writer, int count) throws IOException {
+    long rawDataSize = 0;
+    int nx;
+    for (nx = 0; nx < count; nx++) {
+      byte[] key = composeSortedKey(KEY, count, nx).getBytes();
+      byte[] value = (VALUE + nx).getBytes();
+      writer.append(key, value);
+      rawDataSize +=
+          WritableUtils.getVIntSize(key.length) + key.length
+              + WritableUtils.getVIntSize(value.length) + value.length;
+    }
+    return rawDataSize;
+  }
+
+  /**
+   * Insert some leading 0's in front of the value, to make the keys sorted.
+   * 
+   * @param prefix
+   * @param total
+   * @param value
+   * @return
+   */
+  static String composeSortedKey(String prefix, int total, int value) {
+    return String.format("%s%010d", prefix, value);
+  }
+
+  /**
+   * Calculate how many digits are in the 10-based integer.
+   * 
+   * @param value
+   * @return
+   */
+  private static int numberDigits(int value) {
+    int digits = 0;
+    while ((value = value / 10) > 0) {
+      digits++;
+    }
+    return digits;
+  }
+
+  private void readRecords(int count) throws IOException {
+    readRecords(fs, path, count, conf);
+  }
+
+  static void readRecords(FileSystem fs, Path path, int count,
+      Configuration conf) throws IOException {
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+
+    try {
+      for (int nx = 0; nx < count; nx++, scanner.advance()) {
+        Assert.assertFalse(scanner.atEnd());
+        // Assert.assertTrue(scanner.next());
+
+        byte[] kbuf = new byte[BUF_SIZE];
+        int klen = scanner.entry().getKeyLength();
+        scanner.entry().getKey(kbuf);
+        Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+            count, nx));
+
+        byte[] vbuf = new byte[BUF_SIZE];
+        int vlen = scanner.entry().getValueLength();
+        scanner.entry().getValue(vbuf);
+        Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + nx);
+      }
+
+      Assert.assertTrue(scanner.atEnd());
+      Assert.assertFalse(scanner.advance());
+    }
+    finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void checkBlockIndex(int count, int recordIndex,
+      int blockIndexExpected) throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner = reader.createScanner();
+    scanner.seekTo(composeSortedKey(KEY, count, recordIndex).getBytes());
+    Assert.assertEquals(blockIndexExpected, scanner.currentLocation
+        .getBlockIndex());
+    scanner.close();
+    reader.close();
+  }
+
+  private void readValueBeforeKey(int count, int recordIndex)
+      throws IOException {
+    Reader reader =
+        new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner =
+        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+            .getBytes(), null);
+
+    try {
+      byte[] vbuf = new byte[BUF_SIZE];
+      int vlen = scanner.entry().getValueLength();
+      scanner.entry().getValue(vbuf);
+      Assert.assertEquals(new String(vbuf, 0, vlen), VALUE + recordIndex);
+
+      byte[] kbuf = new byte[BUF_SIZE];
+      int klen = scanner.entry().getKeyLength();
+      scanner.entry().getKey(kbuf);
+      Assert.assertEquals(new String(kbuf, 0, klen), composeSortedKey(KEY,
+          count, recordIndex));
+    }
+    finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void readKeyWithoutValue(int count, int recordIndex)
+      throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+    Scanner scanner =
+        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+            .getBytes(), null);
+
+    try {
+      // read the indexed key
+      byte[] kbuf1 = new byte[BUF_SIZE];
+      int klen1 = scanner.entry().getKeyLength();
+      scanner.entry().getKey(kbuf1);
+      Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+          count, recordIndex));
+
+      if (scanner.advance() && !scanner.atEnd()) {
+        // read the next key following the indexed
+        byte[] kbuf2 = new byte[BUF_SIZE];
+        int klen2 = scanner.entry().getKeyLength();
+        scanner.entry().getKey(kbuf2);
+        Assert.assertEquals(new String(kbuf2, 0, klen2), composeSortedKey(KEY,
+            count, recordIndex + 1));
+      }
+    }
+    finally {
+      scanner.close();
+      reader.close();
+    }
+  }
+
+  private void readValueWithoutKey(int count, int recordIndex)
+      throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+    Scanner scanner =
+        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+            .getBytes(), null);
+
+    byte[] vbuf1 = new byte[BUF_SIZE];
+    int vlen1 = scanner.entry().getValueLength();
+    scanner.entry().getValue(vbuf1);
+    Assert.assertEquals(new String(vbuf1, 0, vlen1), VALUE + recordIndex);
+
+    if (scanner.advance() && !scanner.atEnd()) {
+      byte[] vbuf2 = new byte[BUF_SIZE];
+      int vlen2 = scanner.entry().getValueLength();
+      scanner.entry().getValue(vbuf2);
+      Assert.assertEquals(new String(vbuf2, 0, vlen2), VALUE
+          + (recordIndex + 1));
+    }
+
+    scanner.close();
+    reader.close();
+  }
+
+  private void readKeyManyTimes(int count, int recordIndex) throws IOException {
+    Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
+
+    Scanner scanner =
+        reader.createScanner(composeSortedKey(KEY, count, recordIndex)
+            .getBytes(), null);
+
+    // read the indexed key
+    byte[] kbuf1 = new byte[BUF_SIZE];
+    int klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        count, recordIndex));
+
+    klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        count, recordIndex));
+
+    klen1 = scanner.entry().getKeyLength();
+    scanner.entry().getKey(kbuf1);
+    Assert.assertEquals(new String(kbuf1, 0, klen1), composeSortedKey(KEY,
+        count, recordIndex));
+
+    scanner.close();
+    reader.close();
+  }
+
+  private void closeOutput() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+    if (out != null) {
+      out.close();
+      out = null;
+    }
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileComparators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileComparators.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileComparators.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileComparators.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.TFile.Writer;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+public class TestTFileComparators extends TestCase {
+  private static String ROOT =
+      System.getProperty("test.build.data", "/tmp/tfile-test");
+
+  private final static int BLOCK_SIZE = 512;
+  private FileSystem fs;
+  private Configuration conf;
+  private Path path;
+  private FSDataOutputStream out;
+  private Writer writer;
+
+  private String compression = Compression.Algorithm.GZ.getName();
+  private String outputFile = "TFileTestComparators";
+  /*
+   * pre-sampled numbers of records in one block, based on the given the
+   * generated key and value strings
+   */
+  // private int records1stBlock = 4314;
+  // private int records2ndBlock = 4108;
+  private int records1stBlock = 4480;
+  private int records2ndBlock = 4263;
+
+  @Override
+  public void setUp() throws IOException {
+    conf = new Configuration();
+    path = new Path(ROOT, outputFile);
+    fs = path.getFileSystem(conf);
+    out = fs.create(path);
+  }
+
+  @Override
+  public void tearDown() throws IOException {
+    fs.delete(path, true);
+  }
+
+  // bad comparator format
+  public void testFailureBadComparatorNames() throws IOException {
+    try {
+      writer = new Writer(out, BLOCK_SIZE, compression, "badcmp", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  // jclass that doesn't exist
+  public void testFailureBadJClassNames() throws IOException {
+    try {
+      writer =
+          new Writer(out, BLOCK_SIZE, compression,
+              "jclass: some.non.existence.clazz", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  // class exists but not a RawComparator
+  public void testFailureBadJClasses() throws IOException {
+    try {
+      writer =
+          new Writer(out, BLOCK_SIZE, compression,
+              "jclass:org.apache.hadoop.io.file.tfile.Chunk", conf);
+      Assert.fail("Failed to catch unsupported comparator names");
+    }
+    catch (Exception e) {
+      // noop, expecting exceptions
+      e.printStackTrace();
+    }
+  }
+
+  private void closeOutput() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+    if (out != null) {
+      out.close();
+      out = null;
+    }
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+/**
+ * 
+ * Byte arrays test case class using GZ compression codec, base class of none
+ * and LZO compression classes.
+ * 
+ */
+
+public class TestTFileJClassComparatorByteArrays extends TestTFileByteArrays {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.GZ.getName(),
+        "jclass: org.apache.hadoop.io.file.tfile.MyComparator",
+        "TFileTestJClassComparator", 4480, 4263);
+    super.setUp();
+  }
+}
+
+class MyComparator implements RawComparator<byte[]> {
+
+  @Override
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+  }
+
+  @Override
+  public int compare(byte[] o1, byte[] o2) {
+    return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
+  }
+  
+}
+

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsByteArrays.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsByteArrays extends TestTFileByteArrays {
+  /**
+   * Test LZO compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    skip = !(Algorithm.LZO.isSupported());
+    if (skip) {
+      System.out.println("Skipped");
+    }
+
+    // TODO: sample the generated key/value records, and put the numbers below
+    init(Compression.Algorithm.LZO.getName(), "memcmp", "TFileTestCodecsLzo",
+        2605, 2558);
+    if (!skip)
+      super.setUp();
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileLzoCodecsStreams.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.file.tfile.Compression.Algorithm;
+
+public class TestTFileLzoCodecsStreams extends TestTFileStreams {
+  /**
+   * Test LZO compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    skip = !(Algorithm.LZO.isSupported());
+    if (skip) {
+      System.out.println("Skipped");
+    }
+    init(Compression.Algorithm.LZO.getName(), "memcmp", "TFileTestCodecsLzo");
+    if (!skip) 
+      super.setUp();
+  }
+}

Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java?rev=787913&view=auto
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java (added)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileNoneCodecsByteArrays.java Wed Jun 24 05:48:25 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.io.file.tfile;
+
+import java.io.IOException;
+
+public class TestTFileNoneCodecsByteArrays extends TestTFileByteArrays {
+  /**
+   * Test non-compression codec, using the same test cases as in the ByteArrays.
+   */
+  @Override
+  public void setUp() throws IOException {
+    init(Compression.Algorithm.NONE.getName(), "memcmp", "TFileTestCodecsNone",
+        24, 24);
+    super.setUp();
+  }
+}



Mime
View raw message