Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C482D10274 for ; Wed, 8 May 2013 04:52:23 +0000 (UTC) Received: (qmail 58206 invoked by uid 500); 8 May 2013 04:52:23 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 58049 invoked by uid 500); 8 May 2013 04:52:22 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 58034 invoked by uid 99); 8 May 2013 04:52:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 04:52:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 May 2013 04:52:15 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4DB4A238897D; Wed, 8 May 2013 04:51:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1480159 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/io/orc/ test/org/apache/hadoop/hive/ql/io/orc/ test/resources/ Date: Wed, 08 May 2013 04:51:51 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130508045152.4DB4A238897D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Wed May 8 04:51:50 2013 New Revision: 1480159 URL: http://svn.apache.org/r1480159 Log: HIVE-4421 : Improve memory usage by ORC dictionaries (Owen Omalley via Ashutosh Chauhan) Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java hive/trunk/ql/src/test/resources/orc-file-dump.out Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Wed May 8 04:51:50 2013 @@ -103,7 +103,7 @@ final class DynamicByteArray { * @param value the array to copy from * @param valueOffset the first location to copy from value * @param valueLength the number of bytes to copy from value - * @return + * @return the offset of the start of the value */ public int add(byte[] value, int valueOffset, int valueLength) { int i = length / chunkSize; @@ -266,5 +266,12 @@ final class DynamicByteArray { currentLength = Math.min(length, chunkSize - currentOffset); } } + + /** + * Get the size of the buffers. + */ + public long getSizeInBytes() { + return initializedChunks * chunkSize; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicIntArray.java Wed May 8 04:51:50 2013 @@ -135,5 +135,8 @@ final class DynamicIntArray { return sb.toString(); } + public int getSizeInBytes() { + return 4 * initializedChunks * chunkSize; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java Wed May 8 04:51:50 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -33,19 +35,25 @@ import java.util.Map; * dynamic partitions, it is easy to end up with many writers in the same task. * By managing the size of each allocation, we try to cut down the size of each * allocation and keep the task from running out of memory. + * + * This class is thread safe and uses synchronization around the shared state + * to prevent race conditions. */ class MemoryManager { + + private static final Log LOG = LogFactory.getLog(MemoryManager.class); + /** - * How much does the pool need to change between notifications? + * How often should we check the memory sizes? Measured in rows added + * to all of the writers. */ - private static final double NOTIFICATION_FACTOR = 1.1; + private static final int ROWS_BETWEEN_CHECKS = 5000; private final long totalMemoryPool; - private long notificationTrigger; private final Map writerList = new HashMap(); private long totalAllocation = 0; private double currentScale = 1; - private double lastNotificationScale = 1; + private int rowsAddedSinceCheck = 0; private static class WriterInfo { long allocation; @@ -57,7 +65,13 @@ class MemoryManager { } public interface Callback { - void checkMemory(double newScale) throws IOException; + /** + * The writer needs to check its memory usage + * @param newScale the current scale factor for memory allocations + * @return true if the writer was over the limit + * @throws IOException + */ + boolean checkMemory(double newScale) throws IOException; } /** @@ -70,22 +84,26 @@ class MemoryManager { double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). getHeapMemoryUsage().getMax() * maxLoad); - notificationTrigger = Math.round(totalMemoryPool * NOTIFICATION_FACTOR); } /** - * Add a new writer's memory allocation to the pool + * Add a new writer's memory allocation to the pool. We use the path + * as a unique key to ensure that we don't get duplicates. * @param path the file that is being written * @param requestedAllocation the requested buffer size */ synchronized void addWriter(Path path, long requestedAllocation, Callback callback) throws IOException { WriterInfo oldVal = writerList.get(path); + // this should always be null, but we handle the case where the memory + // manager wasn't told that a writer wasn't still in use and the task + // starts writing to the same path. if (oldVal == null) { oldVal = new WriterInfo(requestedAllocation, callback); writerList.put(path, oldVal); totalAllocation += requestedAllocation; } else { + // handle a new writer that is writing to the same path totalAllocation += requestedAllocation - oldVal.allocation; oldVal.allocation = requestedAllocation; oldVal.callback = callback; @@ -125,6 +143,31 @@ class MemoryManager { } /** + * Give the memory manager an opportunity for doing a memory check. + * @throws IOException + */ + synchronized void addedRow() throws IOException { + if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { + notifyWriters(); + } + } + + /** + * Notify all of the writers that they should check their memory usage. + * @throws IOException + */ + private void notifyWriters() throws IOException { + LOG.debug("Notifying writers after " + rowsAddedSinceCheck); + for(WriterInfo writer: writerList.values()) { + boolean flushed = writer.callback.checkMemory(currentScale); + if (LOG.isDebugEnabled() && flushed) { + LOG.debug("flushed " + writer.toString()); + } + } + rowsAddedSinceCheck = 0; + } + + /** * Update the currentScale based on the current allocation and pool size. * This also updates the notificationTrigger. * @param isAllocate is this an allocation? @@ -135,21 +178,5 @@ class MemoryManager { } else { currentScale = (double) totalMemoryPool / totalAllocation; } - if (!isAllocate) { - // ensure that we notify if we drop 10% from the high water mark - notificationTrigger = - Math.min(notificationTrigger, - Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale)); - } else { - // we've allocated a new writer, so check to see if we need to notify - if (totalAllocation > notificationTrigger) { - for(WriterInfo writer: writerList.values()) { - writer.callback.checkMemory(currentScale); - } - // set the next notification trigger - notificationTrigger = - Math.round(totalMemoryPool * NOTIFICATION_FACTOR / currentScale); - } - } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Wed May 8 04:51:50 2013 @@ -23,15 +23,40 @@ import java.nio.ByteBuffer; class OutStream extends PositionedOutputStream { interface OutputReceiver { + /** + * Output the given buffer to the final destination + * @param buffer the buffer to output + * @throws IOException + */ void output(ByteBuffer buffer) throws IOException; } static final int HEADER_SIZE = 3; private final String name; private final OutputReceiver receiver; + + /** + * Stores the uncompressed bytes that have been serialized, but not + * compressed yet. When this fills, we compress the entire buffer. + */ + private ByteBuffer current = null; + + /** + * Stores the compressed bytes until we have a full buffer and then outputs + * them to the receiver. If no compression is being done, this (and overflow) + * will always be null and the current buffer will be sent directly to the + * receiver. + */ private ByteBuffer compressed = null; + + /** + * Since the compressed buffer may start with contents from previous + * compression blocks, we allocate an overflow buffer so that the + * output of the codec can be split between the two buffers. After the + * compressed buffer is sent to the receiver, the overflow buffer becomes + * the new compressed buffer. + */ private ByteBuffer overflow = null; - private ByteBuffer current; private final int bufferSize; private final CompressionCodec codec; private long compressedBytes = 0; @@ -85,9 +110,11 @@ class OutStream extends PositionedOutput } } + /** + * Allocate a new output buffer if we are compressing. + */ private ByteBuffer getNewOutputBuffer() throws IOException { - return ByteBuffer.allocate(bufferSize + - (codec == null ? 0 : HEADER_SIZE)); + return ByteBuffer.allocate(bufferSize + HEADER_SIZE); } private void flip() throws IOException { @@ -128,7 +155,8 @@ class OutStream extends PositionedOutput private void spill() throws java.io.IOException { // if there isn't anything in the current buffer, don't spill - if (current == null || current.position() == (codec == null ? 0 : HEADER_SIZE)) { + if (current == null || + current.position() == (codec == null ? 0 : HEADER_SIZE)) { return; } flip(); @@ -223,8 +251,18 @@ class OutStream extends PositionedOutput } @Override - public long getSize() { - return uncompressedBytes + compressedBytes; + public long getBufferSize() { + long result = 0; + if (current != null) { + result += current.capacity(); + } + if (compressed != null) { + result += compressed.capacity(); + } + if (overflow != null) { + result += overflow.capacity(); + } + return result; } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionedOutputStream.java Wed May 8 04:51:50 2013 @@ -21,6 +21,18 @@ import java.io.IOException; import java.io.OutputStream; abstract class PositionedOutputStream extends OutputStream { + + /** + * Record the current position to the recorder. + * @param recorder the object that receives the position + * @throws IOException + */ abstract void getPosition(PositionRecorder recorder) throws IOException; - abstract long getSize(); + + /** + * Get the memory size currently allocated as buffer associated with this + * stream. + * @return the number of bytes used by buffers. + */ + abstract long getBufferSize(); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RedBlackTree.java Wed May 8 04:51:50 2013 @@ -25,13 +25,11 @@ package org.apache.hadoop.hive.ql.io.orc */ abstract class RedBlackTree { public static final int NULL = -1; - private static final int DEFAULT_INITIAL_CAPACITY = 16 * 1024; // Various values controlling the offset of the data within the array. private static final int LEFT_OFFSET = 0; private static final int RIGHT_OFFSET = 1; - private static final int COUNT_OFFSET = 2; - private static final int ELEMENT_SIZE = 3; + private static final int ELEMENT_SIZE = 2; protected int size = 0; private final DynamicIntArray data; @@ -40,13 +38,6 @@ abstract class RedBlackTree { private boolean wasAdd = false; /** - * Create a set with a default initial capacity. - */ - public RedBlackTree() { - data = new DynamicIntArray(DEFAULT_INITIAL_CAPACITY * ELEMENT_SIZE); - } - - /** * Create a set with the given initial capacity. */ public RedBlackTree(int initialCapacity) { @@ -63,7 +54,6 @@ abstract class RedBlackTree { size += 1; setLeft(position, left, isRed); setRight(position, right); - setCount(position, 1); return position; } @@ -109,18 +99,6 @@ abstract class RedBlackTree { return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET); } - protected int getCount(int position) { - return data.get(position * ELEMENT_SIZE + COUNT_OFFSET); - } - - private void setCount(int position, int value) { - data.set(position * ELEMENT_SIZE + COUNT_OFFSET, value); - } - - private void incrementCount(int position, int value) { - data.increment(position * ELEMENT_SIZE + COUNT_OFFSET, value); - } - /** * Set the left field of the given position. * Note that we are storing the node color in the low bit of the left pointer. @@ -200,7 +178,6 @@ abstract class RedBlackTree { } else { lastAdd = node; wasAdd = false; - incrementCount(node, 1); return false; } @@ -322,5 +299,11 @@ abstract class RedBlackTree { data.clear(); } + /** + * Get the buffer size in bytes. + */ + public long getSizeInBytes() { + return data.getSizeInBytes(); + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Wed May 8 04:51:50 2013 @@ -24,19 +24,16 @@ import java.io.OutputStream; /** * A red-black tree that stores strings. The strings are stored as UTF-8 bytes - * and an offset/length for each entry. + * and an offset for each entry. */ class StringRedBlackTree extends RedBlackTree { private final DynamicByteArray byteArray = new DynamicByteArray(); - private final DynamicIntArray keySizes = new DynamicIntArray(); + private final DynamicIntArray keyOffsets; private final Text newKey = new Text(); - public StringRedBlackTree() { - // PASS - } - public StringRedBlackTree(int initialCapacity) { super(initialCapacity); + keyOffsets = new DynamicIntArray(initialCapacity); } public int add(String value) { @@ -44,16 +41,22 @@ class StringRedBlackTree extends RedBlac // if the key is new, add it to our byteArray and store the offset & length if (add()) { int len = newKey.getLength(); - keySizes.add(byteArray.add(newKey.getBytes(), 0, len)); - keySizes.add(len); + keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len)); } return lastAdd; } @Override protected int compareValue(int position) { + int start = keyOffsets.get(position); + int end; + if (position + 1 == keyOffsets.size()) { + end = byteArray.size(); + } else { + end = keyOffsets.get(position+1); + } return byteArray.compare(newKey.getBytes(), 0, newKey.getLength(), - keySizes.get(2 * position), keySizes.get(2 * position + 1)); + start, end - start); } /** @@ -84,12 +87,6 @@ class StringRedBlackTree extends RedBlac * @return the string's length in bytes */ int getLength(); - - /** - * Get the count for this key. - * @return the number of times this key was added - */ - int getCount(); } /** @@ -106,6 +103,8 @@ class StringRedBlackTree extends RedBlac private class VisitorContextImpl implements VisitorContext { private int originalPosition; + private int start; + private int end; private final Text text = new Text(); public int getOriginalPosition() { @@ -113,20 +112,26 @@ class StringRedBlackTree extends RedBlac } public Text getText() { - byteArray.setText(text, keySizes.get(originalPosition * 2), getLength()); + byteArray.setText(text, start, end - start); return text; } public void writeBytes(OutputStream out) throws IOException { - byteArray.write(out, keySizes.get(originalPosition * 2), getLength()); + byteArray.write(out, start, end - start); } public int getLength() { - return keySizes.get(originalPosition * 2 + 1); + return end - start; } - public int getCount() { - return StringRedBlackTree.this.getCount(originalPosition); + void setPosition(int position) { + originalPosition = position; + start = keyOffsets.get(originalPosition); + if (position + 1 == keyOffsets.size()) { + end = byteArray.size(); + } else { + end = keyOffsets.get(originalPosition + 1); + } } } @@ -134,7 +139,7 @@ class StringRedBlackTree extends RedBlac ) throws IOException { if (node != NULL) { recurse(getLeft(node), visitor, context); - context.originalPosition = node; + context.setPosition(node); visitor.visit(context); recurse(getRight(node), visitor, context); } @@ -155,7 +160,7 @@ class StringRedBlackTree extends RedBlac public void clear() { super.clear(); byteArray.clear(); - keySizes.clear(); + keyOffsets.clear(); } /** @@ -170,7 +175,8 @@ class StringRedBlackTree extends RedBlac * Calculate the approximate size in memory. * @return the number of bytes used in storing the tree. */ - public long getByteSize() { - return byteArray.size() + 5 * 4 * size(); + public long getSizeInBytes() { + return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() + + super.getSizeInBytes(); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Wed May 8 04:51:50 2013 @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,9 +64,15 @@ import com.google.protobuf.CodedOutputSt * type of column. TreeWriters may have children TreeWriters that handle the * sub-types. Each of the TreeWriters writes the column's data as a set of * streams. + * + * This class is synchronized so that multi-threaded access is ok. In + * particular, because the MemoryManager is shared between writers, this class + * assumes that checkMemory may be called from a separate thread. */ class WriterImpl implements Writer, MemoryManager.Callback { + private static final Log LOG = LogFactory.getLog(WriterImpl.class); + private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; @@ -154,10 +162,18 @@ class WriterImpl implements Writer, Memo } @Override - public void checkMemory(double newScale) throws IOException { - if (estimateStripeSize() > Math.round(stripeSize * newScale)) { - flushStripe(); + public synchronized boolean checkMemory(double newScale) throws IOException { + long limit = (long) Math.round(stripeSize * newScale); + long size = estimateStripeSize(); + if (LOG.isDebugEnabled()) { + LOG.debug("ORC writer " + path + " size = " + size + " limit = " + + limit); + } + if (size > limit) { + flushStripe(); + return true; } + return false; } /** @@ -186,6 +202,18 @@ class WriterImpl implements Writer, Memo } /** + * Get the number of bytes in buffers that are allocated to this stream. + * @return number of bytes in buffers + */ + public long getBufferSize() { + long result = 0; + for(ByteBuffer buf: output) { + result += buf.capacity(); + } + return outStream.getBufferSize() + result; + } + + /** * Flush the stream to the codec. * @throws IOException */ @@ -214,12 +242,9 @@ class WriterImpl implements Writer, Memo } } - /** - * Get the size of compressed and uncompressed data in the stream's buffers. - * @return the number of bytes in the buffers. - */ - long getSize() { - return outStream.getSize(); + @Override + public String toString() { + return outStream.toString(); } } @@ -681,11 +706,12 @@ class WriterImpl implements Writer, Memo } private static class StringTreeWriter extends TreeWriter { + private static final int INITIAL_DICTIONARY_SIZE = 4096; private final PositionedOutputStream stringOutput; private final RunLengthIntegerWriter lengthOutput; private final RunLengthIntegerWriter rowOutput; - private final RunLengthIntegerWriter countOutput; - private final StringRedBlackTree dictionary = new StringRedBlackTree(); + private final StringRedBlackTree dictionary = + new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); private final DynamicIntArray rows = new DynamicIntArray(); private final List savedRowIndex = new ArrayList(); @@ -703,12 +729,6 @@ class WriterImpl implements Writer, Memo OrcProto.Stream.Kind.LENGTH), false); rowOutput = new RunLengthIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA), false); - if (writer.buildIndex()) { - countOutput = new RunLengthIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DICTIONARY_COUNT), false); - } else { - countOutput = null; - } recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); @@ -739,9 +759,6 @@ class WriterImpl implements Writer, Memo context.writeBytes(stringOutput); lengthOutput.write(context.getLength()); dumpOrder[context.getOriginalPosition()] = currentId++; - if (countOutput != null) { - countOutput.write(context.getCount()); - } } }); int length = rows.size(); @@ -770,9 +787,6 @@ class WriterImpl implements Writer, Memo stringOutput.flush(); lengthOutput.flush(); rowOutput.flush(); - if (countOutput != null) { - countOutput.flush(); - } // reset all of the fields to be ready for the next stripe. dictionary.clear(); rows.clear(); @@ -809,7 +823,7 @@ class WriterImpl implements Writer, Memo @Override long estimateMemory() { - return rows.size() * 4 + dictionary.getByteSize(); + return rows.getSizeInBytes() + dictionary.getSizeInBytes(); } } @@ -1434,40 +1448,43 @@ class WriterImpl implements Writer, Memo private long estimateStripeSize() { long result = 0; for(BufferedStream stream: streams.values()) { - result += stream.getSize(); + result += stream.getBufferSize(); } result += treeWriter.estimateMemory(); return result; } @Override - public void addUserMetadata(String name, ByteBuffer value) { + public synchronized void addUserMetadata(String name, ByteBuffer value) { userMetadata.put(name, ByteString.copyFrom(value)); } @Override public void addRow(Object row) throws IOException { - treeWriter.write(row); - rowsInStripe += 1; - if (buildIndex) { - rowsInIndex += 1; - - if (rowsInIndex >= rowIndexStride) { - createRowIndexEntry(); + synchronized (this) { + treeWriter.write(row); + rowsInStripe += 1; + if (buildIndex) { + rowsInIndex += 1; + + if (rowsInIndex >= rowIndexStride) { + createRowIndexEntry(); + } } } - // once every 1000 rows, check the size to see if we should spill - if (rowsInStripe % 1000 == 0) { - checkMemory(memoryManager.getAllocationScale()); - } + memoryManager.addedRow(); } @Override public void close() throws IOException { - flushStripe(); - int footerLength = writeFooter(rawWriter.getPos()); - rawWriter.writeByte(writePostScript(footerLength)); - rawWriter.close(); + // remove us from the memory manager so that we don't get any callbacks memoryManager.removeWriter(path); + // actually close the file + synchronized (this) { + flushStripe(); + int footerLength = writeFooter(rawWriter.getPos()); + rawWriter.writeByte(writePostScript(footerLength)); + rawWriter.close(); + } } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java Wed May 8 04:51:50 2013 @@ -39,8 +39,8 @@ public class TestMemoryManager { private static final double ERROR = 0.000001; private static class NullCallback implements MemoryManager.Callback { - public void checkMemory(double newScale) { - // PASS + public boolean checkMemory(double newScale) { + return false; } } @@ -120,17 +120,13 @@ public class TestMemoryManager { calls[i] = mock(MemoryManager.Callback.class); mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]); } - double[] spills = new double[]{0, 0, 0, 0, 0.8, 0.666666666667, - 0.571428571429, 0.5, 0.444444444444, - 0.4, 0, 0.333333333333, 0, 0.285714285714, - 0, 0.25, 0, 0.222222222222, 0, 0.2}; - for(int spill=0; spill < spills.length; ++spill) { - if (spills[spill] != 0) { - for(int call=0; call < spill + 1; ++call) { - verify(calls[call], times(1)) - .checkMemory(doubleThat(closeTo(spills[spill], ERROR))); - } - } + // add enough rows to get the memory manager to check the limits + for(int i=0; i < 10000; ++i) { + mgr.addedRow(); + } + for(int call=0; call < calls.length; ++call) { + verify(calls[call], times(2)) + .checkMemory(doubleThat(closeTo(0.2, ERROR))); } } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Wed May 8 04:51:50 2013 @@ -52,6 +52,7 @@ import org.junit.Test; import org.junit.rules.TestName; import java.io.File; +import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; import java.sql.Timestamp; @@ -639,7 +640,7 @@ public class TestOrcFile { row.setFieldValue(0, null); union.set((byte) 0, new IntWritable(1732050807)); row.setFieldValue(2, null); - for(int i=0; i < 1000; ++i) { + for(int i=0; i < 5000; ++i) { writer.addRow(row); } union.set((byte) 0, new IntWritable(0)); @@ -651,7 +652,7 @@ public class TestOrcFile { writer.close(); Reader reader = OrcFile.createReader(fs, testFilePath); assertEquals(false, reader.getMetadataKeys().iterator().hasNext()); - assertEquals(1309, reader.getNumberOfRows()); + assertEquals(5309, reader.getNumberOfRows()); DecimalColumnStatistics stats = (DecimalColumnStatistics) reader.getStatistics()[5]; assertEquals(303, stats.getNumberOfValues()); @@ -732,7 +733,7 @@ public class TestOrcFile { assertEquals(new HiveDecimal(new BigInteger(118, rand), rand.nextInt(36)), row.getFieldValue(2)); } - for(int i=0; i < 1000; ++i) { + for(int i=0; i < 5000; ++i) { row = (OrcStruct) rows.next(row); assertEquals(new IntWritable(1732050807), union.getObject()); } @@ -942,6 +943,8 @@ public class TestOrcFile { double rate; Path path = null; long lastAllocation = 0; + int rows = 0; + MemoryManager.Callback callback; MyMemoryManager(Configuration conf, long totalSpace, double rate) { super(conf); @@ -954,6 +957,7 @@ public class TestOrcFile { MemoryManager.Callback callback) { this.path = path; this.lastAllocation = requestedAllocation; + this.callback = callback; } @Override @@ -971,6 +975,13 @@ public class TestOrcFile { double getAllocationScale() { return rate; } + + @Override + void addedRow() throws IOException { + if (++rows % 100 == 0) { + callback.checkMemory(rate); + } + } } @Test @@ -995,9 +1006,9 @@ public class TestOrcFile { for(StripeInformation stripe: reader.getStripes()) { i += 1; assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(), - stripe.getDataLength() < 10000); + stripe.getDataLength() < 5000); } - assertEquals(3, i); + assertEquals(25, i); assertEquals(2500, reader.getNumberOfRows()); } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestStringRedBlackTree.java Wed May 8 04:51:50 2013 @@ -104,7 +104,7 @@ public class TestStringRedBlackTree { System.err.println(indent + "NULL"); } else { System.err.println(indent + "Node " + node + " color " + - (tree.isRed(node) ? "red" : "black") + " count " + tree.getCount(node)); + (tree.isRed(node) ? "red" : "black")); printTree(tree, indent + " ", tree.getLeft(node)); printTree(tree, indent + " ", tree.getRight(node)); } @@ -112,14 +112,12 @@ public class TestStringRedBlackTree { private static class MyVisitor implements StringRedBlackTree.Visitor { private final String[] words; - private final int[] counts; private final int[] order; private final DataOutputBuffer buffer = new DataOutputBuffer(); int current = 0; - MyVisitor(String[] args, int[] counts, int[] order) { + MyVisitor(String[] args, int[] order) { words = args; - this.counts = counts; this.order = order; } @@ -128,7 +126,6 @@ public class TestStringRedBlackTree { ) throws IOException { String word = context.getText().toString(); assertEquals("in word " + current, words[current], word); - assertEquals("in word " + current, counts[current], context.getCount()); assertEquals("in word " + current, order[current], context.getOriginalPosition()); buffer.reset(); @@ -138,14 +135,14 @@ public class TestStringRedBlackTree { } } - void checkContents(StringRedBlackTree tree, int[] counts, int[] order, + void checkContents(StringRedBlackTree tree, int[] order, String... params ) throws IOException { - tree.visit(new MyVisitor(params, counts, order)); + tree.visit(new MyVisitor(params, order)); } StringRedBlackTree buildTree(String... params) throws IOException { - StringRedBlackTree result = new StringRedBlackTree(); + StringRedBlackTree result = new StringRedBlackTree(1000); for(String word: params) { result.add(word); checkTree(result); @@ -156,7 +153,7 @@ public class TestStringRedBlackTree { @Test public void test1() throws Exception { StringRedBlackTree tree = new StringRedBlackTree(5); - assertEquals(0, tree.getByteSize()); + assertEquals(0, tree.getSizeInBytes()); checkTree(tree); assertEquals(0, tree.add("owen")); checkTree(tree); @@ -186,15 +183,12 @@ public class TestStringRedBlackTree { checkTree(tree); assertEquals(9, tree.add("z")); checkTree(tree); - checkContents(tree, new int[]{2,1,2,1,1,1,1,2,1,1}, - new int[]{2,5,1,4,6,3,7,0,9,8}, + checkContents(tree, new int[]{2,5,1,4,6,3,7,0,9,8}, "alan", "arun", "ashutosh", "eric", "eric14", "greg", "o", "owen", "z", "ziggy"); - assertEquals(10*5*4 + 8 + 6 + 5 + 5 * 4 + 2 * 1, tree.getByteSize()); + assertEquals(32888, tree.getSizeInBytes()); // check that adding greg again bumps the count - assertEquals(1, tree.getCount(3)); assertEquals(3, tree.add("greg")); - assertEquals(2, tree.getCount(3)); assertEquals(41, tree.getCharacterSize()); // add some more strings to test the different branches of the // rebalancing @@ -210,7 +204,7 @@ public class TestStringRedBlackTree { checkTree(tree); tree.clear(); checkTree(tree); - assertEquals(0, tree.getByteSize()); + assertEquals(0, tree.getSizeInBytes()); assertEquals(0, tree.getCharacterSize()); } @@ -220,8 +214,7 @@ public class TestStringRedBlackTree { buildTree("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"); assertEquals(26, tree.size()); - checkContents(tree, new int[]{1,1,1, 1,1,1, 1,1,1, 1,1,1, 1,1,1, 1,1,1, - 1,1,1, 1,1,1, 1,1}, new int[]{0,1,2, 3,4,5, 6,7,8, 9,10,11, 12,13,14, + checkContents(tree, new int[]{0,1,2, 3,4,5, 6,7,8, 9,10,11, 12,13,14, 15,16,17, 18,19,20, 21,22,23, 24,25}, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j","k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"); @@ -233,8 +226,7 @@ public class TestStringRedBlackTree { buildTree("z", "y", "x", "w", "v", "u", "t", "s", "r", "q", "p", "o", "n", "m", "l", "k", "j", "i", "h", "g", "f", "e", "d", "c", "b", "a"); assertEquals(26, tree.size()); - checkContents(tree, new int[]{1,1,1, 1,1,1, 1,1,1, 1,1,1, 1,1,1, 1,1,1, - 1,1,1, 1,1,1, 1,1}, new int[]{25,24,23, 22,21,20, 19,18,17, 16,15,14, + checkContents(tree, new int[]{25,24,23, 22,21,20, 19,18,17, 16,15,14, 13,12,11, 10,9,8, 7,6,5, 4,3,2, 1,0}, "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"); Modified: hive/trunk/ql/src/test/resources/orc-file-dump.out URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/resources/orc-file-dump.out?rev=1480159&r1=1480158&r2=1480159&view=diff ============================================================================== --- hive/trunk/ql/src/test/resources/orc-file-dump.out (original) +++ hive/trunk/ql/src/test/resources/orc-file-dump.out Wed May 8 04:51:50 2013 @@ -11,74 +11,87 @@ Statistics: Column 3: count: 21000 min: Darkness, max: worst Stripes: - Stripe: offset: 3 data: 83505 rows: 6000 tail: 91 index: 179 + Stripe: offset: 3 data: 69638 rows: 5000 tail: 85 index: 126 Stream: column 0 section ROW_INDEX start: 3 length 10 Stream: column 1 section ROW_INDEX start: 13 length 38 Stream: column 2 section ROW_INDEX start: 51 length 42 - Stream: column 3 section DICTIONARY_COUNT start: 93 length 53 - Stream: column 3 section ROW_INDEX start: 146 length 36 - Stream: column 1 section PRESENT start: 182 length 11 - Stream: column 1 section DATA start: 193 length 27086 - Stream: column 2 section PRESENT start: 27279 length 11 - Stream: column 2 section DATA start: 27290 length 52124 - Stream: column 3 section PRESENT start: 79414 length 11 - Stream: column 3 section DATA start: 79425 length 4091 - Stream: column 3 section LENGTH start: 83516 length 38 - Stream: column 3 section DICTIONARY_DATA start: 83554 length 133 + Stream: column 3 section ROW_INDEX start: 93 length 36 + Stream: column 1 section PRESENT start: 129 length 11 + Stream: column 1 section DATA start: 140 length 22605 + Stream: column 2 section PRESENT start: 22745 length 11 + Stream: column 2 section DATA start: 22756 length 43426 + Stream: column 3 section PRESENT start: 66182 length 11 + Stream: column 3 section DATA start: 66193 length 3403 + Stream: column 3 section LENGTH start: 69596 length 38 + Stream: column 3 section DICTIONARY_DATA start: 69634 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT Encoding column 3: DICTIONARY[35] - Stripe: offset: 83778 data: 83453 rows: 6000 tail: 91 index: 180 - Stream: column 0 section ROW_INDEX start: 83778 length 10 - Stream: column 1 section ROW_INDEX start: 83788 length 39 - Stream: column 2 section ROW_INDEX start: 83827 length 42 - Stream: column 3 section DICTIONARY_COUNT start: 83869 length 53 - Stream: column 3 section ROW_INDEX start: 83922 length 36 - Stream: column 1 section PRESENT start: 83958 length 11 - Stream: column 1 section DATA start: 83969 length 27093 - Stream: column 2 section PRESENT start: 111062 length 11 - Stream: column 2 section DATA start: 111073 length 52119 - Stream: column 3 section PRESENT start: 163192 length 11 - Stream: column 3 section DATA start: 163203 length 4037 - Stream: column 3 section LENGTH start: 167240 length 38 - Stream: column 3 section DICTIONARY_DATA start: 167278 length 133 + Stripe: offset: 69852 data: 69617 rows: 5000 tail: 83 index: 124 + Stream: column 0 section ROW_INDEX start: 69852 length 10 + Stream: column 1 section ROW_INDEX start: 69862 length 36 + Stream: column 2 section ROW_INDEX start: 69898 length 42 + Stream: column 3 section ROW_INDEX start: 69940 length 36 + Stream: column 1 section PRESENT start: 69976 length 11 + Stream: column 1 section DATA start: 69987 length 22597 + Stream: column 2 section PRESENT start: 92584 length 11 + Stream: column 2 section DATA start: 92595 length 43439 + Stream: column 3 section PRESENT start: 136034 length 11 + Stream: column 3 section DATA start: 136045 length 3377 + Stream: column 3 section LENGTH start: 139422 length 38 + Stream: column 3 section DICTIONARY_DATA start: 139460 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT Encoding column 3: DICTIONARY[35] - Stripe: offset: 167502 data: 83456 rows: 6000 tail: 92 index: 182 - Stream: column 0 section ROW_INDEX start: 167502 length 10 - Stream: column 1 section ROW_INDEX start: 167512 length 39 - Stream: column 2 section ROW_INDEX start: 167551 length 42 - Stream: column 3 section DICTIONARY_COUNT start: 167593 length 55 - Stream: column 3 section ROW_INDEX start: 167648 length 36 - Stream: column 1 section PRESENT start: 167684 length 11 - Stream: column 1 section DATA start: 167695 length 27080 - Stream: column 2 section PRESENT start: 194775 length 11 - Stream: column 2 section DATA start: 194786 length 52093 - Stream: column 3 section PRESENT start: 246879 length 11 - Stream: column 3 section DATA start: 246890 length 4079 - Stream: column 3 section LENGTH start: 250969 length 38 - Stream: column 3 section DICTIONARY_DATA start: 251007 length 133 + Stripe: offset: 139676 data: 69603 rows: 5000 tail: 85 index: 127 + Stream: column 0 section ROW_INDEX start: 139676 length 10 + Stream: column 1 section ROW_INDEX start: 139686 length 39 + Stream: column 2 section ROW_INDEX start: 139725 length 42 + Stream: column 3 section ROW_INDEX start: 139767 length 36 + Stream: column 1 section PRESENT start: 139803 length 11 + Stream: column 1 section DATA start: 139814 length 22594 + Stream: column 2 section PRESENT start: 162408 length 11 + Stream: column 2 section DATA start: 162419 length 43415 + Stream: column 3 section PRESENT start: 205834 length 11 + Stream: column 3 section DATA start: 205845 length 3390 + Stream: column 3 section LENGTH start: 209235 length 38 + Stream: column 3 section DICTIONARY_DATA start: 209273 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT Encoding column 3: DICTIONARY[35] - Stripe: offset: 251232 data: 41842 rows: 3000 tail: 90 index: 172 - Stream: column 0 section ROW_INDEX start: 251232 length 10 - Stream: column 1 section ROW_INDEX start: 251242 length 39 - Stream: column 2 section ROW_INDEX start: 251281 length 43 - Stream: column 3 section DICTIONARY_COUNT start: 251324 length 44 - Stream: column 3 section ROW_INDEX start: 251368 length 36 - Stream: column 1 section PRESENT start: 251404 length 9 - Stream: column 1 section DATA start: 251413 length 13544 - Stream: column 2 section PRESENT start: 264957 length 9 - Stream: column 2 section DATA start: 264966 length 26072 - Stream: column 3 section PRESENT start: 291038 length 9 - Stream: column 3 section DATA start: 291047 length 2028 - Stream: column 3 section LENGTH start: 293075 length 38 - Stream: column 3 section DICTIONARY_DATA start: 293113 length 133 + Stripe: offset: 209491 data: 69584 rows: 5000 tail: 84 index: 126 + Stream: column 0 section ROW_INDEX start: 209491 length 10 + Stream: column 1 section ROW_INDEX start: 209501 length 38 + Stream: column 2 section ROW_INDEX start: 209539 length 42 + Stream: column 3 section ROW_INDEX start: 209581 length 36 + Stream: column 1 section PRESENT start: 209617 length 11 + Stream: column 1 section DATA start: 209628 length 22575 + Stream: column 2 section PRESENT start: 232203 length 11 + Stream: column 2 section DATA start: 232214 length 43426 + Stream: column 3 section PRESENT start: 275640 length 11 + Stream: column 3 section DATA start: 275651 length 3379 + Stream: column 3 section LENGTH start: 279030 length 38 + Stream: column 3 section DICTIONARY_DATA start: 279068 length 133 + Encoding column 0: DIRECT + Encoding column 1: DIRECT + Encoding column 2: DIRECT + Encoding column 3: DICTIONARY[35] + Stripe: offset: 279285 data: 14111 rows: 1000 tail: 80 index: 127 + Stream: column 0 section ROW_INDEX start: 279285 length 10 + Stream: column 1 section ROW_INDEX start: 279295 length 39 + Stream: column 2 section ROW_INDEX start: 279334 length 42 + Stream: column 3 section ROW_INDEX start: 279376 length 36 + Stream: column 1 section PRESENT start: 279412 length 5 + Stream: column 1 section DATA start: 279417 length 4529 + Stream: column 2 section PRESENT start: 283946 length 5 + Stream: column 2 section DATA start: 283951 length 8690 + Stream: column 3 section PRESENT start: 292641 length 5 + Stream: column 3 section DATA start: 292646 length 706 + Stream: column 3 section LENGTH start: 293352 length 38 + Stream: column 3 section DICTIONARY_DATA start: 293390 length 133 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT