hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1594713 [3/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/org/apache/hadoop/hive/hbase/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java...
Date Wed, 14 May 2014 20:53:32 GMT
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java?rev=1594713&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java Wed May 14 20:53:30 2014
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.hash.MurmurHash;
+
+
+/**
+ * The structure storing arbitrary amount of data as a set of fixed-size byte buffers.
+ * Maintains read and write pointers for convenient single-threaded writing/reading.
+ */
+public final class WriteBuffers implements RandomAccessOutput {
+  private final ArrayList<byte[]> writeBuffers = new ArrayList<byte[]>(1);
+  /** Buffer size in writeBuffers */
+  private final int wbSize;
+  private final long maxSize;
+
+  private byte[] currentWriteBuffer;
+  private int currentWriteBufferIndex;
+  /** The offset in the last writeBuffer where the values are added */
+  private int currentWriteOffset = 0;
+
+  private byte[] currentReadBuffer = null;
+  private int currentReadBufferIndex = 0;
+  private int currentReadOffset = 0;
+
+  public WriteBuffers(int wbSize, long maxSize) {
+    this.wbSize = wbSize;
+    this.maxSize = maxSize;
+    currentWriteBufferIndex = -1;
+    nextBufferToWrite();
+  }
+
+  public long readVLong() {
+    ponderNextBufferToRead();
+    byte firstByte = currentReadBuffer[currentReadOffset++];
+    int length = (byte) WritableUtils.decodeVIntSize(firstByte) - 1;
+    if (length == 0) {
+      return firstByte;
+    }
+    long i = 0;
+    if (isAllInOneReadBuffer(length)) {
+      for (int idx = 0; idx < length; idx++) {
+        i = (i << 8) | (currentReadBuffer[currentReadOffset + idx] & 0xFF);
+      }
+      currentReadOffset += length;
+    } else {
+      for (int idx = 0; idx < length; idx++) {
+        i = (i << 8) | (readNextByte() & 0xFF);
+      }
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
+
+  public void skipVLong() {
+    ponderNextBufferToRead();
+    byte firstByte = currentReadBuffer[currentReadOffset++];
+    int length = (byte) WritableUtils.decodeVIntSize(firstByte);
+    if (length > 1) {
+      currentReadOffset += (length - 1);
+    }
+    int diff = currentReadOffset - wbSize;
+    while (diff >= 0) {
+      ++currentReadBufferIndex;
+      currentReadBuffer = writeBuffers.get(currentReadBufferIndex);
+      currentReadOffset = diff;
+      diff = currentReadOffset - wbSize;
+    }
+  }
+
+  public void setReadPoint(long offset) {
+    currentReadBufferIndex = getBufferIndex(offset);
+    currentReadBuffer = writeBuffers.get(currentReadBufferIndex);
+    currentReadOffset = getOffset(offset);
+  }
+
+  public int hashCode(long offset, int length) {
+    setReadPoint(offset);
+    if (isAllInOneReadBuffer(length)) {
+      int result = murmurHash(currentReadBuffer, currentReadOffset, length);
+      currentReadOffset += length;
+      return result;
+    }
+
+    // Rare case of buffer boundary. Unfortunately we'd have to copy some bytes.
+    byte[] bytes = new byte[length];
+    int destOffset = 0;
+    while (destOffset < length) {
+      ponderNextBufferToRead();
+      int toRead = Math.min(length - destOffset, wbSize - currentReadOffset);
+      System.arraycopy(currentReadBuffer, currentReadOffset, bytes, destOffset, toRead);
+      currentReadOffset += toRead;
+      destOffset += toRead;
+    }
+    return murmurHash(bytes, 0, bytes.length);
+  }
+
+  private byte readNextByte() {
+    // This method is inefficient. It's only used when something crosses buffer boundaries.
+    ponderNextBufferToRead();
+    return currentReadBuffer[currentReadOffset++];
+  }
+
+  private void ponderNextBufferToRead() {
+    if (currentReadOffset >= wbSize) {
+      ++currentReadBufferIndex;
+      currentReadBuffer = writeBuffers.get(currentReadBufferIndex);
+      currentReadOffset = 0;
+    }
+  }
+
+  public int hashCode(byte[] key, int offset, int length) {
+    return murmurHash(key, offset, length);
+  }
+
+  private void setByte(long offset, byte value) {
+    // No checks, the caller must ensure the offsets are correct.
+    writeBuffers.get(getBufferIndex(offset))[getOffset(offset)] = value;
+  }
+
+  @Override
+  public void reserve(int byteCount) {
+    if (byteCount < 0) throw new AssertionError("byteCount must be positive");
+    int currentWriteOffset = this.currentWriteOffset + byteCount;
+    while (currentWriteOffset > wbSize) {
+      nextBufferToWrite();
+      currentWriteOffset -= wbSize;
+    }
+    this.currentWriteOffset = currentWriteOffset;
+  }
+
+  public void setWritePoint(long offset) {
+    currentWriteBufferIndex = getBufferIndex(offset);
+    currentWriteBuffer = writeBuffers.get(currentWriteBufferIndex);
+    currentWriteOffset = getOffset(offset);
+  }
+
+  @Override
+  public void write(int b) {
+    if (currentWriteOffset == wbSize) {
+      nextBufferToWrite();
+    }
+    currentWriteBuffer[currentWriteOffset++] = (byte)b;
+  }
+
+  @Override
+  public void write(byte[] b) {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) {
+    int srcOffset = 0;
+    while (srcOffset < len) {
+      int toWrite = Math.min(len - srcOffset, wbSize - currentWriteOffset);
+      System.arraycopy(b, srcOffset + off, currentWriteBuffer, currentWriteOffset, toWrite);
+      currentWriteOffset += toWrite;
+      srcOffset += toWrite;
+      if (currentWriteOffset == wbSize) {
+        nextBufferToWrite();
+      }
+    }
+  }
+
+  @Override
+  public int getLength() {
+    return (int)getWritePoint();
+  }
+
+  private int getOffset(long offset) {
+    return (int)(offset % wbSize);
+  }
+
+  private int getBufferIndex(long offset) {
+    return (int)(offset / wbSize);
+  }
+
+  private void nextBufferToWrite() {
+    if (currentWriteBufferIndex == (writeBuffers.size() - 1)) {
+      if ((1 + writeBuffers.size()) * wbSize > maxSize) {
+        // We could verify precisely at write time, but just do approximate at allocation time.
+        throw new RuntimeException("Too much memory used by write buffers");
+      }
+      writeBuffers.add(new byte[wbSize]);
+    }
+    ++currentWriteBufferIndex;
+    currentWriteBuffer = writeBuffers.get(currentWriteBufferIndex);
+    currentWriteOffset = 0;
+  }
+
+  /** Compares two parts of the buffer with each other. Does not modify readPoint. */
+  public boolean isEqual(long leftOffset, int leftLength, long rightOffset, int rightLength) {
+    if (rightLength != leftLength) {
+      return false;
+    }
+    int leftIndex = getBufferIndex(leftOffset), rightIndex = getBufferIndex(rightOffset),
+        leftFrom = getOffset(leftOffset), rightFrom = getOffset(rightOffset);
+    byte[] leftBuffer = writeBuffers.get(leftIndex), rightBuffer = writeBuffers.get(rightIndex);
+    if (leftFrom + leftLength <= wbSize && rightFrom + rightLength <= wbSize) {
+      for (int i = 0; i < leftLength; ++i) {
+        if (leftBuffer[leftFrom + i] != rightBuffer[rightFrom + i]) {
+          return false;
+        }
+      }
+      return true;
+    }
+    for (int i = 0; i < leftLength; ++i) {
+      if (leftFrom == wbSize) {
+        ++leftIndex;
+        leftBuffer = writeBuffers.get(leftIndex);
+        leftFrom = 0;
+      }
+      if (rightFrom == wbSize) {
+        ++rightIndex;
+        rightBuffer = writeBuffers.get(rightIndex);
+        rightFrom = 0;
+      }
+      if (leftBuffer[leftFrom++] != rightBuffer[rightFrom++]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Compares part of the buffer with a part of an external byte array.
+   * Does not modify readPoint.
+   */
+  public boolean isEqual(byte[] left, int leftLength, long rightOffset, int rightLength) {
+    if (rightLength != leftLength) {
+      return false;
+    }
+    int rightIndex = getBufferIndex(rightOffset), rightFrom = getOffset(rightOffset);
+    byte[] rightBuffer = writeBuffers.get(rightIndex);
+    if (rightFrom + rightLength <= wbSize) {
+      // TODO: allow using unsafe optionally.
+      for (int i = 0; i < leftLength; ++i) {
+        if (left[i] != rightBuffer[rightFrom + i]) {
+          return false;
+        }
+      }
+      return true;
+    }
+    for (int i = 0; i < rightLength; ++i) {
+      if (rightFrom == wbSize) {
+        ++rightIndex;
+        rightBuffer = writeBuffers.get(rightIndex);
+        rightFrom = 0;
+      }
+      if (left[i] != rightBuffer[rightFrom++]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public void clear() {
+    writeBuffers.clear();
+    currentWriteBuffer = currentReadBuffer = null;
+    currentWriteOffset = currentReadOffset = currentWriteBufferIndex = currentReadBufferIndex = 0;
+  }
+
+  public long getWritePoint() {
+    return (currentWriteBufferIndex * wbSize) + currentWriteOffset;
+  }
+
+  public long getReadPoint() {
+    return (currentReadBufferIndex * wbSize) + currentReadOffset;
+  }
+
+  public void writeVLong(long value) {
+    LazyBinaryUtils.writeVLong(this, value);
+  }
+
+  /** Reads some bytes from the buffer and writes them again at current write point. */
+  public void writeBytes(long offset, int length) {
+    int readBufIndex = getBufferIndex(offset);
+    byte[] readBuffer = writeBuffers.get(readBufIndex);
+    int readBufOffset = getOffset(offset);
+    int srcOffset = 0;
+    while (srcOffset < length) {
+      if (readBufOffset == wbSize) {
+        ++readBufIndex;
+        readBuffer = writeBuffers.get(readBufIndex);
+        readBufOffset = 0;
+      }
+      if (currentWriteOffset == wbSize) {
+        nextBufferToWrite();
+      }
+      // How much we can read from current read buffer, out of what we need.
+      int toRead = Math.min(length - srcOffset, wbSize - readBufOffset);
+      // How much we can write to current write buffer, out of what we need.
+      int toWrite = Math.min(toRead, wbSize - currentWriteOffset);
+      System.arraycopy(readBuffer, readBufOffset, currentWriteBuffer, currentWriteOffset, toWrite);
+      currentWriteOffset += toWrite;
+      readBufOffset += toWrite;
+      srcOffset += toWrite;
+      if (toRead > toWrite) {
+        nextBufferToWrite();
+        toRead -= toWrite; // Remains to copy from current read buffer. Less than wbSize by def.
+        System.arraycopy(readBuffer, readBufOffset, currentWriteBuffer, currentWriteOffset, toRead);
+        currentWriteOffset += toRead;
+        readBufOffset += toRead;
+        srcOffset += toRead;
+      }
+    }
+  }
+
+  /**
+   * The class representing a segment of bytes in the buffer. Can either be a reference
+   * to a segment of the whole WriteBuffers (when bytes is not set), or to a segment of
+   * some byte array (when bytes is set).
+   */
+  public static class ByteSegmentRef {
+    public ByteSegmentRef(long offset, int length) {
+      if (length < 0) {
+        throw new AssertionError("Length is negative: " + length);
+      }
+      this.offset = offset;
+      this.length = length;
+    }
+    public byte[] getBytes() {
+      return bytes;
+    }
+    public long getOffset() {
+      return offset;
+    }
+    public int getLength() {
+      return length;
+    }
+    public ByteBuffer copy() {
+      byte[] copy = new byte[length];
+      System.arraycopy(bytes, (int)offset, copy, 0, length);
+      return ByteBuffer.wrap(copy);
+    }
+    private byte[] bytes = null;
+    private long offset;
+    private int length;
+  }
+
+  /**
+   * Changes the byte segment reference from being a reference to global buffer to
+   * the one with a self-contained byte array. The byte array will either be one of
+   * the internal ones, or a copy of data if the original reference pointed to a data
+   * spanning multiple internal buffers.
+   */
+  public void populateValue(WriteBuffers.ByteSegmentRef value) {
+    // At this point, we are going to make a copy if need to avoid array boundaries.
+    int index = getBufferIndex(value.getOffset());
+    byte[] buffer = writeBuffers.get(index);
+    int bufferOffset = getOffset(value.getOffset());
+    int length = value.getLength();
+    if (bufferOffset + length <= wbSize) {
+      value.bytes = buffer;
+      value.offset = bufferOffset;
+    } else {
+      value.bytes = new byte[length];
+      value.offset = 0;
+      int destOffset = 0;
+      while (destOffset < length) {
+        if (destOffset > 0) {
+          buffer = writeBuffers.get(++index);
+          bufferOffset = 0;
+        }
+        int toCopy = Math.min(length - destOffset, wbSize - bufferOffset);
+        System.arraycopy(buffer, bufferOffset, value.bytes, destOffset, toCopy);
+        destOffset += toCopy;
+      }
+    }
+  }
+
+  private boolean isAllInOneReadBuffer(int length) {
+    return currentReadOffset + length <= wbSize;
+  }
+
+  private boolean isAllInOneWriteBuffer(int length) {
+    return currentWriteOffset + length <= wbSize;
+  }
+
+  public void seal() {
+    if (currentWriteOffset < (wbSize * 0.8)) { // arbitrary
+      byte[] smallerBuffer = new byte[currentWriteOffset];
+      System.arraycopy(currentWriteBuffer, 0, smallerBuffer, 0, currentWriteOffset);
+      writeBuffers.set(currentWriteBufferIndex, smallerBuffer);
+    }
+    if (currentWriteBufferIndex + 1 < writeBuffers.size()) {
+      writeBuffers.subList(currentWriteBufferIndex + 1, writeBuffers.size()).clear();
+    }
+    currentWriteBuffer = currentReadBuffer = null; // Make sure we don't reference any old buffer.
+    currentWriteBufferIndex = currentReadBufferIndex = -1;
+    currentReadOffset = currentWriteOffset = -1;
+  }
+
+  public long readFiveByteULong(long offset) {
+    return readNByteLong(offset, 5);
+  }
+
+  private long readNByteLong(long offset, int bytes) {
+    setReadPoint(offset);
+    long v = 0;
+    if (isAllInOneReadBuffer(bytes)) {
+      for (int i = 0; i < bytes; ++i) {
+        v = (v << 8) + (currentReadBuffer[currentReadOffset + i] & 0xff);
+      }
+      currentReadOffset += bytes;
+    } else {
+      for (int i = 0; i < bytes; ++i) {
+        v = (v << 8) + (readNextByte() & 0xff);
+      }
+    }
+    return v;
+  }
+
+  public void writeFiveByteULong(long offset, long v) {
+    int prevIndex = currentWriteBufferIndex, prevOffset = currentWriteOffset;
+    setWritePoint(offset);
+    if (isAllInOneWriteBuffer(5)) {
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 32);
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 24);
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 16);
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >>> 8);
+      currentWriteBuffer[currentWriteOffset] = (byte)(v);
+    } else {
+      setByte(offset++, (byte)(v >>> 32));
+      setByte(offset++, (byte)(v >>> 24));
+      setByte(offset++, (byte)(v >>> 16));
+      setByte(offset++, (byte)(v >>> 8));
+      setByte(offset, (byte)(v));
+    }
+    currentWriteBufferIndex = prevIndex;
+    currentWriteBuffer = writeBuffers.get(currentWriteBufferIndex);
+    currentWriteOffset = prevOffset;
+  }
+
+  public int readInt(long offset) {
+    return (int)readNByteLong(offset, 4);
+  }
+
+  @Override
+  public void writeInt(long offset, int v) {
+    int prevIndex = currentWriteBufferIndex, prevOffset = currentWriteOffset;
+    setWritePoint(offset);
+    if (isAllInOneWriteBuffer(4)) {
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 24);
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 16);
+      currentWriteBuffer[currentWriteOffset++] = (byte)(v >> 8);
+      currentWriteBuffer[currentWriteOffset] = (byte)(v);
+    } else {
+      setByte(offset++, (byte)(v >>> 24));
+      setByte(offset++, (byte)(v >>> 16));
+      setByte(offset++, (byte)(v >>> 8));
+      setByte(offset, (byte)(v));
+    }
+    currentWriteBufferIndex = prevIndex;
+    currentWriteBuffer = writeBuffers.get(currentWriteBufferIndex);
+    currentWriteOffset = prevOffset;
+  }
+
+  // Lifted from org.apache.hadoop.util.hash.MurmurHash... but supports offset.
+  private static int murmurHash(byte[] data, int offset, int length) {
+    int m = 0x5bd1e995;
+    int r = 24;
+
+    int h = length;
+
+    int len_4 = length >> 2;
+
+    for (int i = 0; i < len_4; i++) {
+      int i_4 = offset + (i << 2);
+      int k = data[i_4 + 3];
+      k = k << 8;
+      k = k | (data[i_4 + 2] & 0xff);
+      k = k << 8;
+      k = k | (data[i_4 + 1] & 0xff);
+      k = k << 8;
+      k = k | (data[i_4 + 0] & 0xff);
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+      h *= m;
+      h ^= k;
+    }
+
+    // avoid calculating modulo
+    int len_m = len_4 << 2;
+    int left = length - len_m;
+
+    if (left != 0) {
+      length += offset;
+      if (left >= 3) {
+        h ^= (int) data[length - 3] << 16;
+      }
+      if (left >= 2) {
+        h ^= (int) data[length - 2] << 8;
+      }
+      if (left >= 1) {
+        h ^= (int) data[length - 1];
+      }
+
+      h *= m;
+    }
+
+    h ^= h >>> 13;
+    h *= m;
+    h ^= h >>> 15;
+
+    return h;
+  }
+}
\ No newline at end of file

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java Wed May 14 20:53:30 2014
@@ -34,6 +34,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
@@ -109,8 +112,7 @@ import org.apache.hadoop.io.Writable;
  */
 public class BinarySortableSerDe extends AbstractSerDe {
 
-  public static final Log LOG = LogFactory.getLog(BinarySortableSerDe.class
-      .getName());
+  public static final Log LOG = LogFactory.getLog(BinarySortableSerDe.class.getName());
 
   List<String> columnNames;
   List<TypeInfo> columnTypes;
@@ -588,33 +590,39 @@ public class BinarySortableSerDe extends
   }
 
   BytesWritable serializeBytesWritable = new BytesWritable();
-  OutputByteBuffer outputByteBuffer = new OutputByteBuffer();
+  ByteStream.Output output = new ByteStream.Output();
 
   @Override
   public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
-    outputByteBuffer.reset();
+    output.reset();
     StructObjectInspector soi = (StructObjectInspector) objInspector;
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
 
     for (int i = 0; i < columnNames.size(); i++) {
-      serialize(outputByteBuffer, soi.getStructFieldData(obj, fields.get(i)),
+      serialize(output, soi.getStructFieldData(obj, fields.get(i)),
           fields.get(i).getFieldObjectInspector(), columnSortOrderIsDesc[i]);
     }
 
-    serializeBytesWritable.set(outputByteBuffer.getData(), 0, outputByteBuffer
-        .getLength());
+    serializeBytesWritable.set(output.getData(), 0, output.getLength());
     return serializeBytesWritable;
   }
 
-  static void serialize(OutputByteBuffer buffer, Object o, ObjectInspector oi,
+  private static void writeByte(RandomAccessOutput buffer, byte b, boolean invert) {
+    if (invert) {
+      b = (byte) (0xff ^ b);
+    }
+    buffer.write(b);
+  }
+
+  static void serialize(ByteStream.Output buffer, Object o, ObjectInspector oi,
       boolean invert) throws SerDeException {
     // Is this field a null?
     if (o == null) {
-      buffer.write((byte) 0, invert);
+      writeByte(buffer, (byte) 0, invert);
       return;
     }
     // This field is not a null.
-    buffer.write((byte) 1, invert);
+    writeByte(buffer, (byte) 1, invert);
 
     switch (oi.getCategory()) {
     case PRIMITIVE: {
@@ -625,20 +633,20 @@ public class BinarySortableSerDe extends
       }
       case BOOLEAN: {
         boolean v = ((BooleanObjectInspector) poi).get(o);
-        buffer.write((byte) (v ? 2 : 1), invert);
+        writeByte(buffer, (byte) (v ? 2 : 1), invert);
         return;
       }
       case BYTE: {
         ByteObjectInspector boi = (ByteObjectInspector) poi;
         byte v = boi.get(o);
-        buffer.write((byte) (v ^ 0x80), invert);
+        writeByte(buffer, (byte) (v ^ 0x80), invert);
         return;
       }
       case SHORT: {
         ShortObjectInspector spoi = (ShortObjectInspector) poi;
         short v = spoi.get(o);
-        buffer.write((byte) ((v >> 8) ^ 0x80), invert);
-        buffer.write((byte) v, invert);
+        writeByte(buffer, (byte) ((v >> 8) ^ 0x80), invert);
+        writeByte(buffer, (byte) v, invert);
         return;
       }
       case INT: {
@@ -650,14 +658,14 @@ public class BinarySortableSerDe extends
       case LONG: {
         LongObjectInspector loi = (LongObjectInspector) poi;
         long v = loi.get(o);
-        buffer.write((byte) ((v >> 56) ^ 0x80), invert);
-        buffer.write((byte) (v >> 48), invert);
-        buffer.write((byte) (v >> 40), invert);
-        buffer.write((byte) (v >> 32), invert);
-        buffer.write((byte) (v >> 24), invert);
-        buffer.write((byte) (v >> 16), invert);
-        buffer.write((byte) (v >> 8), invert);
-        buffer.write((byte) v, invert);
+        writeByte(buffer, (byte) ((v >> 56) ^ 0x80), invert);
+        writeByte(buffer, (byte) (v >> 48), invert);
+        writeByte(buffer, (byte) (v >> 40), invert);
+        writeByte(buffer, (byte) (v >> 32), invert);
+        writeByte(buffer, (byte) (v >> 24), invert);
+        writeByte(buffer, (byte) (v >> 16), invert);
+        writeByte(buffer, (byte) (v >> 8), invert);
+        writeByte(buffer, (byte) v, invert);
         return;
       }
       case FLOAT: {
@@ -670,10 +678,10 @@ public class BinarySortableSerDe extends
           // positive number, flip the first bit
           v = v ^ (1 << 31);
         }
-        buffer.write((byte) (v >> 24), invert);
-        buffer.write((byte) (v >> 16), invert);
-        buffer.write((byte) (v >> 8), invert);
-        buffer.write((byte) v, invert);
+        writeByte(buffer, (byte) (v >> 24), invert);
+        writeByte(buffer, (byte) (v >> 16), invert);
+        writeByte(buffer, (byte) (v >> 8), invert);
+        writeByte(buffer, (byte) v, invert);
         return;
       }
       case DOUBLE: {
@@ -686,14 +694,14 @@ public class BinarySortableSerDe extends
           // positive number, flip the first bit
           v = v ^ (1L << 63);
         }
-        buffer.write((byte) (v >> 56), invert);
-        buffer.write((byte) (v >> 48), invert);
-        buffer.write((byte) (v >> 40), invert);
-        buffer.write((byte) (v >> 32), invert);
-        buffer.write((byte) (v >> 24), invert);
-        buffer.write((byte) (v >> 16), invert);
-        buffer.write((byte) (v >> 8), invert);
-        buffer.write((byte) v, invert);
+        writeByte(buffer, (byte) (v >> 56), invert);
+        writeByte(buffer, (byte) (v >> 48), invert);
+        writeByte(buffer, (byte) (v >> 40), invert);
+        writeByte(buffer, (byte) (v >> 32), invert);
+        writeByte(buffer, (byte) (v >> 24), invert);
+        writeByte(buffer, (byte) (v >> 16), invert);
+        writeByte(buffer, (byte) (v >> 8), invert);
+        writeByte(buffer, (byte) v, invert);
         return;
       }
       case STRING: {
@@ -740,7 +748,7 @@ public class BinarySortableSerDe extends
         TimestampWritable t = toi.getPrimitiveWritableObject(o);
         byte[] data = t.getBinarySortable();
         for (int i = 0; i < data.length; i++) {
-          buffer.write(data[i], invert);
+          writeByte(buffer, data[i], invert);
         }
         return;
       }
@@ -763,23 +771,23 @@ public class BinarySortableSerDe extends
         // get the sign of the big decimal
         int sign = dec.compareTo(HiveDecimal.ZERO);
 
-    // we'll encode the absolute value (sign is separate)
-    dec = dec.abs();
+        // we'll encode the absolute value (sign is separate)
+        dec = dec.abs();
 
-    // get the scale factor to turn big decimal into a decimal < 1
-    int factor = dec.precision() - dec.scale();
-    factor = sign == 1 ? factor : -factor;
+        // get the scale factor to turn big decimal into a decimal < 1
+        int factor = dec.precision() - dec.scale();
+        factor = sign == 1 ? factor : -factor;
 
         // convert the absolute big decimal to string
         dec.scaleByPowerOfTen(Math.abs(dec.scale()));
         String digits = dec.unscaledValue().toString();
 
         // finally write out the pieces (sign, scale, digits)
-        buffer.write((byte) ( sign + 1), invert);
-        buffer.write((byte) ((factor >> 24) ^ 0x80), invert);
-        buffer.write((byte) ( factor >> 16), invert);
-        buffer.write((byte) ( factor >> 8), invert);
-        buffer.write((byte)   factor, invert);
+        writeByte(buffer, (byte) ( sign + 1), invert);
+        writeByte(buffer, (byte) ((factor >> 24) ^ 0x80), invert);
+        writeByte(buffer, (byte) ( factor >> 16), invert);
+        writeByte(buffer, (byte) ( factor >> 8), invert);
+        writeByte(buffer, (byte)   factor, invert);
         serializeBytes(buffer, digits.getBytes(decimalCharSet),
             digits.length(), sign == -1 ? !invert : invert);
         return;
@@ -798,11 +806,11 @@ public class BinarySortableSerDe extends
       // \1 followed by each element
       int size = loi.getListLength(o);
       for (int eid = 0; eid < size; eid++) {
-        buffer.write((byte) 1, invert);
+        writeByte(buffer, (byte) 1, invert);
         serialize(buffer, loi.getListElement(o, eid), eoi, invert);
       }
       // and \0 to terminate
-      buffer.write((byte) 0, invert);
+      writeByte(buffer, (byte) 0, invert);
       return;
     }
     case MAP: {
@@ -813,12 +821,12 @@ public class BinarySortableSerDe extends
       // \1 followed by each key and then each value
       Map<?, ?> map = moi.getMap(o);
       for (Map.Entry<?, ?> entry : map.entrySet()) {
-        buffer.write((byte) 1, invert);
+        writeByte(buffer, (byte) 1, invert);
         serialize(buffer, entry.getKey(), koi, invert);
         serialize(buffer, entry.getValue(), voi, invert);
       }
       // and \0 to terminate
-      buffer.write((byte) 0, invert);
+      writeByte(buffer, (byte) 0, invert);
       return;
     }
     case STRUCT: {
@@ -834,7 +842,7 @@ public class BinarySortableSerDe extends
     case UNION: {
       UnionObjectInspector uoi = (UnionObjectInspector) oi;
       byte tag = uoi.getTag(o);
-      buffer.write(tag, invert);
+      writeByte(buffer, tag, invert);
       serialize(buffer, uoi.getField(o), uoi.getObjectInspectors().get(tag),
           invert);
       return;
@@ -846,23 +854,24 @@ public class BinarySortableSerDe extends
 
   }
 
-  private static void serializeBytes(OutputByteBuffer buffer, byte[] data, int length, boolean invert){
+  private static void serializeBytes(
+      ByteStream.Output buffer, byte[] data, int length, boolean invert) {
     for (int i = 0; i < length; i++) {
       if (data[i] == 0 || data[i] == 1) {
-        buffer.write((byte) 1, invert);
-        buffer.write((byte) (data[i] + 1), invert);
+        writeByte(buffer, (byte) 1, invert);
+        writeByte(buffer, (byte) (data[i] + 1), invert);
       } else {
-        buffer.write(data[i], invert);
+        writeByte(buffer, data[i], invert);
       }
     }
-    buffer.write((byte) 0, invert);
+    writeByte(buffer, (byte) 0, invert);
   }
 
-  private static void serializeInt(OutputByteBuffer buffer, int v, boolean invert) {
-    buffer.write((byte) ((v >> 24) ^ 0x80), invert);
-    buffer.write((byte) (v >> 16), invert);
-    buffer.write((byte) (v >> 8), invert);
-    buffer.write((byte) v, invert);
+  private static void serializeInt(ByteStream.Output buffer, int v, boolean invert) {
+    writeByte(buffer, (byte) ((v >> 24) ^ 0x80), invert);
+    writeByte(buffer, (byte) (v >> 16), invert);
+    writeByte(buffer, (byte) (v >> 8), invert);
+    writeByte(buffer, (byte) v, invert);
   }
 
   @Override
@@ -870,4 +879,15 @@ public class BinarySortableSerDe extends
     // no support for statistics
     return null;
   }
+
+  public static void serializeStruct(Output byteStream, Object[] fieldData,
+      List<ObjectInspector> fieldOis, boolean[] sortableSortOrders) throws SerDeException {
+    for (int i = 0; i < fieldData.length; i++) {
+      serialize(byteStream, fieldData[i], fieldOis.get(i), sortableSortOrders[i]);
+    }
+  }
+
+  public boolean[] getSortOrders() {
+    return columnSortOrderIsDesc;
+  }
 }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Wed May 14 20:53:30 2014
@@ -180,11 +180,11 @@ public class ColumnarSerDe extends Colum
         }
 
         field[i].set(serializeStream.getData(), count, serializeStream
-            .getCount()
+            .getLength()
             - count);
-        count = serializeStream.getCount();
+        count = serializeStream.getLength();
       }
-      serializedSize = serializeStream.getCount();
+      serializedSize = serializeStream.getLength();
       lastOperationSerialize = true;
       lastOperationDeserialize = false;
     } catch (IOException e) {

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java Wed May 14 20:53:30 2014
@@ -94,7 +94,7 @@ public class LazyBinaryColumnarSerDe ext
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
     List<Object> list = soi.getStructFieldsDataAsList(obj);
 
-    boolean warnedOnceNullMapKey = false;
+    LazyBinarySerDe.BooleanRef warnedOnceNullMapKey = new LazyBinarySerDe.BooleanRef(false);
     serializeStream.reset();
     serializedSize = 0;
     int streamOffset = 0;
@@ -114,11 +114,11 @@ public class LazyBinaryColumnarSerDe ext
         LazyBinarySerDe.serialize(serializeStream, f, foi, true, warnedOnceNullMapKey);
       }
       field[i].set(serializeStream.getData(), streamOffset, serializeStream
-          .getCount()
+          .getLength()
           - streamOffset);
-      streamOffset = serializeStream.getCount();
+      streamOffset = serializeStream.getLength();
     }
-    serializedSize = serializeStream.getCount();
+    serializedSize = serializeStream.getLength();
     lastOperationSerialize = true;
     lastOperationDeserialize = false;
     return serializeCache;

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/dynamic_type/DynamicSerDe.java Wed May 14 20:53:30 2014
@@ -219,7 +219,7 @@ public class DynamicSerDe extends Abstra
       e.printStackTrace();
       throw new SerDeException(e);
     }
-    ret.set(bos_.getData(), 0, bos_.getCount());
+    ret.set(bos_.getData(), 0, bos_.getLength());
     return ret;
   }
 

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/DateWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/DateWritable.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/DateWritable.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/DateWritable.java Wed May 14 20:53:30 2014
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
 import org.apache.hadoop.io.WritableComparable;
@@ -143,7 +144,7 @@ public class DateWritable implements Wri
     set(vInt.value);
   }
 
-  public void writeToByteStream(Output byteStream) {
+  public void writeToByteStream(RandomAccessOutput byteStream) {
     LazyBinaryUtils.writeVInt(byteStream, getDays());
   }
 

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveDecimalWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveDecimalWritable.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveDecimalWritable.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveDecimalWritable.java Wed May 14 20:53:30 2014
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.common.type.Decimal128;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -128,7 +129,7 @@ public class HiveDecimalWritable impleme
     byteStream.write(bytes, 0, bytes.length);
   }
 
-  public void writeToByteStream(Output byteStream) {
+  public void writeToByteStream(RandomAccessOutput byteStream) {
     LazyBinaryUtils.writeVInt(byteStream, scale);
     LazyBinaryUtils.writeVInt(byteStream, internalStorage.length);
     byteStream.write(internalStorage, 0, internalStorage.length);

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/io/TimestampWritable.java Wed May 14 20:53:30 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
 import org.apache.hadoop.io.WritableComparable;
@@ -151,7 +152,7 @@ public class TimestampWritable implement
     timestampEmpty = true;
   }
 
-  public void writeToByteStream(Output byteStream) {
+  public void writeToByteStream(RandomAccessOutput byteStream) {
     checkBytes();
     byteStream.write(currentBytes, offset, getTotalLength());
   }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java Wed May 14 20:53:30 2014
@@ -430,8 +430,8 @@ public class LazySimpleSerDe extends Abs
     // TODO: The copy of data is unnecessary, but there is no work-around
     // since we cannot directly set the private byte[] field inside Text.
     serializeCache
-        .set(serializeStream.getData(), 0, serializeStream.getCount());
-    serializedSize = serializeStream.getCount();
+        .set(serializeStream.getData(), 0, serializeStream.getLength());
+    serializedSize = serializeStream.getLength();
     lastOperationSerialize = true;
     lastOperationDeserialize = false;
     return serializeCache;

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinarySerDe.java Wed May 14 20:53:30 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde.serd
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
@@ -190,7 +191,7 @@ public class LazyBinarySerDe extends Abs
    */
   BytesWritable serializeBytesWritable = new BytesWritable();
   ByteStream.Output serializeByteStream = new ByteStream.Output();
-  boolean nullMapKey = false;
+  BooleanRef nullMapKey = new BooleanRef(false);
 
   /**
    * Serialize an object to a byte buffer in a binary compact way.
@@ -207,13 +208,11 @@ public class LazyBinarySerDe extends Abs
 
     serializeByteStream.reset();
     // serialize the row as a struct
-    nullMapKey = serializeStruct(serializeByteStream, obj,
-        (StructObjectInspector) objInspector, nullMapKey);
+    serializeStruct(serializeByteStream, obj, (StructObjectInspector) objInspector, nullMapKey);
     // return the serialized bytes
-    serializeBytesWritable.set(serializeByteStream.getData(), 0,
-        serializeByteStream.getCount());
+    serializeBytesWritable.set(serializeByteStream.getData(), 0, serializeByteStream.getLength());
 
-    serializedSize = serializeByteStream.getCount();
+    serializedSize = serializeByteStream.getLength();
     lastOperationSerialize = true;
     lastOperationDeserialize = false;
     return serializeBytesWritable;
@@ -230,11 +229,11 @@ public class LazyBinarySerDe extends Abs
     }
   }
 
-  private static boolean serializeStruct(Output byteStream, Object obj,
-      StructObjectInspector soi, boolean warnedOnceNullMapKey) throws SerDeException {
+  private static void serializeStruct(RandomAccessOutput byteStream, Object obj,
+      StructObjectInspector soi, BooleanRef warnedOnceNullMapKey) throws SerDeException {
     // do nothing for null struct
     if (null == obj) {
-      return warnedOnceNullMapKey;
+      return;
     }
 
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
@@ -247,12 +246,12 @@ public class LazyBinarySerDe extends Abs
       fieldOis.add(field.getFieldObjectInspector());
     }
 
-    return serializeStruct(byteStream, fieldData, fieldOis, warnedOnceNullMapKey);
+    serializeStruct(byteStream, fieldData, fieldOis, warnedOnceNullMapKey);
   }
 
-  public static void serializeStruct(Output byteStream, Object[] fieldData,
+  public static void serializeStruct(RandomAccessOutput byteStream, Object[] fieldData,
       List<ObjectInspector> fieldOis) throws SerDeException {
-    serializeStruct(byteStream, fieldData, fieldOis, true);
+    serializeStruct(byteStream, fieldData, fieldOis, null);
   }
 
   /**
@@ -267,12 +266,10 @@ public class LazyBinarySerDe extends Abs
    *          the struct object inspector
    * @param warnedOnceNullMapKey a boolean indicating whether a warning
    *          has been issued once already when encountering null map keys
-   * @return a boolean indicating whether a warning for null map keys has been issued
-   *          once already
-   * @throws SerDeException 
    */
-  private static boolean serializeStruct(Output byteStream, Object[] fieldData,
-      List<ObjectInspector> fieldOis, boolean warnedOnceNullMapKey) throws SerDeException {
+  private static void serializeStruct(RandomAccessOutput byteStream, Object[] fieldData,
+      List<ObjectInspector> fieldOis, BooleanRef warnedOnceNullMapKey)
+          throws SerDeException {
 
     int lasti = 0;
     byte nullByte = 0;
@@ -289,17 +286,16 @@ public class LazyBinarySerDe extends Abs
       if (7 == i % 8 || i == size - 1) {
         byteStream.write(nullByte);
         for (int j = lasti; j <= i; j++) {
-          warnedOnceNullMapKey = serialize(
-              byteStream, fieldData[j], fieldOis.get(j), false, warnedOnceNullMapKey);
+          serialize(byteStream, fieldData[j], fieldOis.get(j), false, warnedOnceNullMapKey);
         }
         lasti = i + 1;
         nullByte = 0;
       }
     }
-    return warnedOnceNullMapKey;
   }
 
-  private static void serializeText(Output byteStream, Text t, boolean skipLengthPrefix) {
+  private static void serializeText(
+      RandomAccessOutput byteStream, Text t, boolean skipLengthPrefix) {
     /* write byte size of the string which is a vint */
     int length = t.getLength();
     if (!skipLengthPrefix) {
@@ -310,6 +306,14 @@ public class LazyBinarySerDe extends Abs
     byteStream.write(data, 0, length);
   }
 
+  public static class BooleanRef {
+    public BooleanRef(boolean v) {
+      value = v;
+    }
+
+    public boolean value;
+  }
+
   /**
    * A recursive function that serialize an object to a byte buffer based on its
    * object inspector.
@@ -324,15 +328,14 @@ public class LazyBinarySerDe extends Abs
    *          needed for list/map/struct
    * @param warnedOnceNullMapKey a boolean indicating whether a warning
    *          has been issued once already when encountering null map keys
-   * @return a boolean indicating whether a warning for null map keys has been issued
-   *          once already
    */
-  public static boolean serialize(Output byteStream, Object obj, ObjectInspector objInspector,
-      boolean skipLengthPrefix, boolean warnedOnceNullMapKey) throws SerDeException {
+  public static void serialize(RandomAccessOutput byteStream, Object obj,
+      ObjectInspector objInspector, boolean skipLengthPrefix, BooleanRef warnedOnceNullMapKey)
+          throws SerDeException {
 
     // do nothing for null object
     if (null == obj) {
-      return warnedOnceNullMapKey;
+      return;
     }
 
     switch (objInspector.getCategory()) {
@@ -340,37 +343,37 @@ public class LazyBinarySerDe extends Abs
       PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector;
       switch (poi.getPrimitiveCategory()) {
       case VOID: {
-        return warnedOnceNullMapKey;
+        return;
       }
       case BOOLEAN: {
         boolean v = ((BooleanObjectInspector) poi).get(obj);
         byteStream.write((byte) (v ? 1 : 0));
-        return warnedOnceNullMapKey;
+        return;
       }
       case BYTE: {
         ByteObjectInspector boi = (ByteObjectInspector) poi;
         byte v = boi.get(obj);
         byteStream.write(v);
-        return warnedOnceNullMapKey;
+        return;
       }
       case SHORT: {
         ShortObjectInspector spoi = (ShortObjectInspector) poi;
         short v = spoi.get(obj);
         byteStream.write((byte) (v >> 8));
         byteStream.write((byte) (v));
-        return warnedOnceNullMapKey;
+        return;
       }
       case INT: {
         IntObjectInspector ioi = (IntObjectInspector) poi;
         int v = ioi.get(obj);
         LazyBinaryUtils.writeVInt(byteStream, v);
-        return warnedOnceNullMapKey;
+        return;
       }
       case LONG: {
         LongObjectInspector loi = (LongObjectInspector) poi;
         long v = loi.get(obj);
         LazyBinaryUtils.writeVLong(byteStream, v);
-        return warnedOnceNullMapKey;
+        return;
       }
       case FLOAT: {
         FloatObjectInspector foi = (FloatObjectInspector) poi;
@@ -379,30 +382,30 @@ public class LazyBinarySerDe extends Abs
         byteStream.write((byte) (v >> 16));
         byteStream.write((byte) (v >> 8));
         byteStream.write((byte) (v));
-        return warnedOnceNullMapKey;
+        return;
       }
       case DOUBLE: {
         DoubleObjectInspector doi = (DoubleObjectInspector) poi;
         LazyBinaryUtils.writeDouble(byteStream, doi.get(obj));
-        return warnedOnceNullMapKey;
+        return;
       }
       case STRING: {
         StringObjectInspector soi = (StringObjectInspector) poi;
         Text t = soi.getPrimitiveWritableObject(obj);
         serializeText(byteStream, t, skipLengthPrefix);
-        return warnedOnceNullMapKey;
+        return;
       }
       case CHAR: {
         HiveCharObjectInspector hcoi = (HiveCharObjectInspector) poi;
         Text t = hcoi.getPrimitiveWritableObject(obj).getTextValue();
         serializeText(byteStream, t, skipLengthPrefix);
-        return warnedOnceNullMapKey;
+        return;
       }
       case VARCHAR: {
         HiveVarcharObjectInspector hcoi = (HiveVarcharObjectInspector) poi;
         Text t = hcoi.getPrimitiveWritableObject(obj).getTextValue();
         serializeText(byteStream, t, skipLengthPrefix);
-        return warnedOnceNullMapKey;
+        return;
       }
       case BINARY: {
         BinaryObjectInspector baoi = (BinaryObjectInspector) poi;
@@ -417,29 +420,29 @@ public class LazyBinarySerDe extends Abs
           }
         }
         byteStream.write(bw.getBytes(),0,length);
-        return warnedOnceNullMapKey;
+        return;
       }
 
       case DATE: {
         DateWritable d = ((DateObjectInspector) poi).getPrimitiveWritableObject(obj);
         d.writeToByteStream(byteStream);
-        return warnedOnceNullMapKey;
+        return;
       }
       case TIMESTAMP: {
         TimestampObjectInspector toi = (TimestampObjectInspector) poi;
         TimestampWritable t = toi.getPrimitiveWritableObject(obj);
         t.writeToByteStream(byteStream);
-        return warnedOnceNullMapKey;
+        return;
       }
 
       case DECIMAL: {
         HiveDecimalObjectInspector bdoi = (HiveDecimalObjectInspector) poi;
         HiveDecimalWritable t = bdoi.getPrimitiveWritableObject(obj);
         if (t == null) {
-          return warnedOnceNullMapKey;
+          return;
         }
         t.writeToByteStream(byteStream);
-        return warnedOnceNullMapKey;
+        return;
       }
 
       default: {
@@ -457,12 +460,9 @@ public class LazyBinarySerDe extends Abs
       if (!skipLengthPrefix) {
         // 1/ reserve spaces for the byte size of the list
         // which is a integer and takes four bytes
-        byteSizeStart = byteStream.getCount();
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        listStart = byteStream.getCount();
+        byteSizeStart = byteStream.getLength();
+        byteStream.reserve(4);
+        listStart = byteStream.getLength();
       }
       // 2/ write the size of the list as a VInt
       int size = loi.getListLength(obj);
@@ -485,21 +485,16 @@ public class LazyBinarySerDe extends Abs
 
       // 4/ write element by element from the list
       for (int eid = 0; eid < size; eid++) {
-        warnedOnceNullMapKey = serialize(byteStream, loi.getListElement(obj, eid), eoi,
-            false, warnedOnceNullMapKey);
+        serialize(byteStream, loi.getListElement(obj, eid), eoi, false, warnedOnceNullMapKey);
       }
 
       if (!skipLengthPrefix) {
         // 5/ update the list byte size
-        int listEnd = byteStream.getCount();
+        int listEnd = byteStream.getLength();
         int listSize = listEnd - listStart;
-        byte[] bytes = byteStream.getData();
-        bytes[byteSizeStart] = (byte) (listSize >> 24);
-        bytes[byteSizeStart + 1] = (byte) (listSize >> 16);
-        bytes[byteSizeStart + 2] = (byte) (listSize >> 8);
-        bytes[byteSizeStart + 3] = (byte) (listSize);
+        writeSizeAtOffset(byteStream, byteSizeStart, listSize);
       }
-      return warnedOnceNullMapKey;
+      return;
     }
     case MAP: {
       MapObjectInspector moi = (MapObjectInspector) objInspector;
@@ -512,12 +507,9 @@ public class LazyBinarySerDe extends Abs
       if (!skipLengthPrefix) {
         // 1/ reserve spaces for the byte size of the map
         // which is a integer and takes four bytes
-        byteSizeStart = byteStream.getCount();
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        mapStart = byteStream.getCount();
+        byteSizeStart = byteStream.getLength();
+        byteStream.reserve(4);
+        mapStart = byteStream.getLength();
       }
 
       // 2/ write the size of the map which is a VInt
@@ -531,9 +523,11 @@ public class LazyBinarySerDe extends Abs
         // set the bit to 1 if a key is not null
         if (null != entry.getKey()) {
           nullByte |= 1 << (b % 8);
-        } else if (!warnedOnceNullMapKey) {
-          warnedOnceNullMapKey = true;
-          LOG.warn("Null map key encountered! Ignoring similar problems.");
+        } else if (warnedOnceNullMapKey != null) {
+          if (!warnedOnceNullMapKey.value) {
+            LOG.warn("Null map key encountered! Ignoring similar problems.");
+          }
+          warnedOnceNullMapKey.value = true;
         }
         b++;
         // set the bit to 1 if a value is not null
@@ -551,21 +545,17 @@ public class LazyBinarySerDe extends Abs
 
       // 4/ write key-value pairs one by one
       for (Map.Entry<?, ?> entry : map.entrySet()) {
-        warnedOnceNullMapKey = serialize(byteStream, entry.getKey(), koi, false, warnedOnceNullMapKey);
-        warnedOnceNullMapKey = serialize(byteStream, entry.getValue(), voi, false, warnedOnceNullMapKey);
+        serialize(byteStream, entry.getKey(), koi, false, warnedOnceNullMapKey);
+        serialize(byteStream, entry.getValue(), voi, false, warnedOnceNullMapKey);
       }
 
       if (!skipLengthPrefix) {
         // 5/ update the byte size of the map
-        int mapEnd = byteStream.getCount();
+        int mapEnd = byteStream.getLength();
         int mapSize = mapEnd - mapStart;
-        byte[] bytes = byteStream.getData();
-        bytes[byteSizeStart] = (byte) (mapSize >> 24);
-        bytes[byteSizeStart + 1] = (byte) (mapSize >> 16);
-        bytes[byteSizeStart + 2] = (byte) (mapSize >> 8);
-        bytes[byteSizeStart + 3] = (byte) (mapSize);
+        writeSizeAtOffset(byteStream, byteSizeStart, mapSize);
       }
-      return warnedOnceNullMapKey;
+      return;
     }
     case STRUCT: {
       int byteSizeStart = 0;
@@ -573,28 +563,20 @@ public class LazyBinarySerDe extends Abs
       if (!skipLengthPrefix) {
         // 1/ reserve spaces for the byte size of the struct
         // which is a integer and takes four bytes
-        byteSizeStart = byteStream.getCount();
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        byteStream.write((byte) 0);
-        structStart = byteStream.getCount();
+        byteSizeStart = byteStream.getLength();
+        byteStream.reserve(4);
+        structStart = byteStream.getLength();
       }
       // 2/ serialize the struct
-      warnedOnceNullMapKey = serializeStruct(byteStream, obj, (StructObjectInspector) objInspector,
-          warnedOnceNullMapKey);
+      serializeStruct(byteStream, obj, (StructObjectInspector) objInspector, warnedOnceNullMapKey);
 
       if (!skipLengthPrefix) {
         // 3/ update the byte size of the struct
-        int structEnd = byteStream.getCount();
+        int structEnd = byteStream.getLength();
         int structSize = structEnd - structStart;
-        byte[] bytes = byteStream.getData();
-        bytes[byteSizeStart] = (byte) (structSize >> 24);
-        bytes[byteSizeStart + 1] = (byte) (structSize >> 16);
-        bytes[byteSizeStart + 2] = (byte) (structSize >> 8);
-        bytes[byteSizeStart + 3] = (byte) (structSize);
+        writeSizeAtOffset(byteStream, byteSizeStart, structSize);
       }
-      return warnedOnceNullMapKey;
+      return;
     }
     default: {
       throw new RuntimeException("Unrecognized type: "
@@ -603,10 +585,14 @@ public class LazyBinarySerDe extends Abs
     }
   }
 
+  private static void writeSizeAtOffset(
+      RandomAccessOutput byteStream, int byteSizeStart, int size) {
+    byteStream.writeInt(byteSizeStart, size);
+  }
+
   /**
    * Returns the statistics after (de)serialization)
    */
-
   @Override
   public SerDeStats getSerDeStats() {
     // must be different

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryStruct.java Wed May 14 20:53:30 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BinaryComparable;
 
 /**
  * LazyBinaryStruct is serialized as follows: start A B A B A B end bytes[] ->
@@ -198,6 +199,49 @@ public class LazyBinaryStruct extends La
     return uncheckedGetField(fieldID);
   }
 
+  public static final class SingleFieldGetter {
+    private final LazyBinaryStructObjectInspector soi;
+    private final int fieldIndex;
+    private final RecordInfo recordInfo = new LazyBinaryUtils.RecordInfo();
+    private byte[] fieldBytes;
+    private int fieldStart, fieldLength;
+    public SingleFieldGetter(LazyBinaryStructObjectInspector soi, int fieldIndex) {
+      this.soi = soi;
+      this.fieldIndex = fieldIndex;
+    }
+
+    public void init(BinaryComparable src) {
+      List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
+      fieldBytes = src.getBytes();
+      int length = src.getLength();
+      byte nullByte = fieldBytes[0];
+      int lastFieldByteEnd = 1, fieldStart = -1, fieldLength = -1;
+      for (int i = 0; i <= fieldIndex; i++) {
+        if ((nullByte & (1 << (i % 8))) != 0) {
+          LazyBinaryUtils.checkObjectByteInfo(fieldRefs.get(i)
+              .getFieldObjectInspector(), fieldBytes, lastFieldByteEnd, recordInfo);
+          fieldStart = lastFieldByteEnd + recordInfo.elementOffset;
+          fieldLength = recordInfo.elementSize;
+          lastFieldByteEnd = fieldStart + fieldLength;
+        } else {
+          fieldStart = fieldLength = -1;
+        }
+
+        if (7 == (i % 8)) {
+          nullByte = (lastFieldByteEnd < length) ? fieldBytes[lastFieldByteEnd] : 0;
+          ++lastFieldByteEnd;
+        }
+      }
+    }
+
+    public short getShort() {
+      assert (2 == fieldLength);
+      return LazyBinaryUtils.byteArrayToShort(fieldBytes, fieldStart);
+    }
+  }
+
+
+
   /**
    * Get the field out of the row without checking parsed. This is called by
    * both getField and getFieldsAsList.
@@ -231,12 +275,15 @@ public class LazyBinaryStruct extends La
       parse();
     }
     if (cachedList == null) {
-      cachedList = new ArrayList<Object>();
+      cachedList = new ArrayList<Object>(fields.length);
+      for (int i = 0; i < fields.length; i++) {
+        cachedList.add(uncheckedGetField(i));
+      }
     } else {
-      cachedList.clear();
-    }
-    for (int i = 0; i < fields.length; i++) {
-      cachedList.add(uncheckedGetField(i));
+      assert fields.length == cachedList.size();
+      for (int i = 0; i < fields.length; i++) {
+        cachedList.set(i, uncheckedGetField(i));
+      }
     }
     return cachedList;
   }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/LazyBinaryUtils.java Wed May 14 20:53:30 2014
@@ -24,6 +24,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.lazybinary.objectinspector.LazyBinaryObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -334,7 +336,7 @@ public final class LazyBinaryUtils {
    * @param i
    *          the int
    */
-  public static void writeVInt(Output byteStream, int i) {
+  public static void writeVInt(RandomAccessOutput byteStream, int i) {
     writeVLong(byteStream, i);
   }
 
@@ -409,13 +411,13 @@ public final class LazyBinaryUtils {
     }
   };
 
-  public static void writeVLong(Output byteStream, long l) {
+  public static void writeVLong(RandomAccessOutput byteStream, long l) {
     byte[] vLongBytes = vLongBytesThreadLocal.get();
     int len = LazyBinaryUtils.writeVLongToByteArray(vLongBytes, l);
     byteStream.write(vLongBytes, 0, len);
   }
   
-  public static void writeDouble(Output byteStream, double d) {
+  public static void writeDouble(RandomAccessOutput byteStream, double d) {
     long v = Double.doubleToLongBits(d);
     byteStream.write((byte) (v >> 56));
     byteStream.write((byte) (v >> 48));

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryStructObjectInspector.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryStructObjectInspector.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryStructObjectInspector.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/objectinspector/LazyBinaryStructObjectInspector.java Wed May 14 20:53:30 2014
@@ -72,4 +72,8 @@ public class LazyBinaryStructObjectInspe
     LazyBinaryStruct struct = (LazyBinaryStruct) data;
     return struct.getFieldsAsList();
   }
+
+  public StructField getStructFieldRef(int index) {
+    return fields.get(index);
+  }
 }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java Wed May 14 20:53:30 2014
@@ -363,22 +363,22 @@ public class PrimitiveObjectInspectorCon
       case BYTE:
         out.reset();
         LazyInteger.writeUTF8NoException(out, ((ByteObjectInspector) inputOI).get(input));
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         return t;
       case SHORT:
         out.reset();
         LazyInteger.writeUTF8NoException(out, ((ShortObjectInspector) inputOI).get(input));
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         return t;
       case INT:
         out.reset();
         LazyInteger.writeUTF8NoException(out, ((IntObjectInspector) inputOI).get(input));
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         return t;
       case LONG:
         out.reset();
         LazyLong.writeUTF8NoException(out, ((LongObjectInspector) inputOI).get(input));
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         return t;
       case FLOAT:
         t.set(String.valueOf(((FloatObjectInspector) inputOI).get(input)));

Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java (original)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/lazy/TestLazyPrimitive.java Wed May 14 20:53:30 2014
@@ -438,7 +438,7 @@ public class TestLazyPrimitive extends T
         out.reset();
         LazyInteger.writeUTF8(out, v);
         Text t = new Text();
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         assertEquals(String.valueOf(v), t.toString());
       }
 
@@ -458,7 +458,7 @@ public class TestLazyPrimitive extends T
         out.reset();
         LazyLong.writeUTF8(out, v);
         Text t = new Text();
-        t.set(out.getData(), 0, out.getCount());
+        t.set(out.getData(), 0, out.getLength());
         assertEquals(String.valueOf(v), t.toString());
       }
 

Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/thrift_test/CreateSequenceFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/thrift_test/CreateSequenceFile.java?rev=1594713&r1=1594712&r2=1594713&view=diff
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/thrift_test/CreateSequenceFile.java (original)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/thrift_test/CreateSequenceFile.java Wed May 14 20:53:30 2014
@@ -75,7 +75,7 @@ public final class CreateSequenceFile {
     public BytesWritable serialize(TBase base) throws TException {
       bos.reset();
       base.write(outProtocol);
-      bw.set(bos.getData(), 0, bos.getCount());
+      bw.set(bos.getData(), 0, bos.getLength());
       return bw;
     }
   }



Mime
View raw message