Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E9AD817E05 for ; Mon, 2 Feb 2015 22:28:15 +0000 (UTC) Received: (qmail 66463 invoked by uid 500); 2 Feb 2015 22:28:16 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 66417 invoked by uid 500); 2 Feb 2015 22:28:16 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 66394 invoked by uid 99); 2 Feb 2015 22:28:16 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Feb 2015 22:28:16 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 73CA3AC094F; Mon, 2 Feb 2015 22:28:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1656595 [1/2] - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/java/or... Date: Mon, 02 Feb 2015 22:28:16 -0000 To: commits@hive.apache.org From: sershe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150202222816.73CA3AC094F@hades.apache.org> Author: sershe Date: Mon Feb 2 22:28:15 2015 New Revision: 1656595 URL: http://svn.apache.org/r1656595 Log: HIVE-9418p2 : Part of the encoded data production pipeline (incomplete, only to allow parallel work) Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Removed: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadata.java Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Mon Feb 2 22:28:15 2015 @@ -18,24 +18,42 @@ package org.apache.hadoop.hive.llap.io.api; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; public class EncodedColumn { // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance // generality, and ability to not copy data from underlying low-level cached buffers. - public static class ColumnBuffer { + public static class StreamBuffer { + public StreamBuffer(int firstOffset, int lastLength) { + this.firstOffset = firstOffset; + this.lastLength = lastLength; + } // TODO: given how ORC will allocate, it might make sense to share array between all // returned encodedColumn-s, and store index and length in the array. - public LlapMemoryBuffer[] cacheBuffers; + public List cacheBuffers; public int firstOffset, lastLength; + // StreamBuffer can be reused for many RGs (e.g. dictionary case). To avoid locking every + // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself. + public AtomicInteger refCount = new AtomicInteger(0); + public void incRef() { + refCount.incrementAndGet(); + } + public int decRef() { + return refCount.decrementAndGet(); + } } - public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) { + public EncodedColumn(BatchKey batchKey, int columnIndex, int streamCount) { this.batchKey = batchKey; this.columnIndex = columnIndex; - this.columnData = columnData; + this.streamData = new StreamBuffer[streamCount]; + this.streamKind = new int[streamCount]; } public BatchKey batchKey; public int columnIndex; - public ColumnBuffer columnData; + public StreamBuffer[] streamData; + public int[] streamKind; // TODO: can decoder infer this from metadata? } \ No newline at end of file Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Mon Feb 2 22:28:15 2015 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.llap.io.api.cache; import java.util.LinkedList; +import java.util.List; import org.apache.hadoop.hive.common.DiskRange; @@ -48,6 +49,8 @@ public interface LowLevelCache { */ void allocateMultiple(LlapMemoryBuffer[] dest, int size); - void releaseBuffers(LlapMemoryBuffer[] cacheBuffers); + void releaseBuffers(List cacheBuffers); + + LlapMemoryBuffer createUnallocated(); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Mon Feb 2 22:28:15 2015 @@ -18,10 +18,10 @@ package org.apache.hadoop.hive.llap.cache; -import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; /** Dummy interface for now, might be different. */ public interface Cache { - public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value); - public ColumnBuffer get(CacheKey key); + public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value); + public StreamBuffer get(CacheKey key); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Mon Feb 2 22:28:15 2015 @@ -121,8 +121,7 @@ public class LowLevelCacheImpl implement if (currentNotCached.offset == currentCached.offset) { if (currentNotCached.end <= currentCached.end) { // we assume it's always "==" now // Replace the entire current DiskRange with new cached range. - drIter.remove(); - drIter.add(currentCached); + drIter.set(currentCached); currentNotCached = null; } else { // Insert the new cache range before the disk range. @@ -251,9 +250,9 @@ public class LowLevelCacheImpl implement } @Override - public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) { - for (int i = 0; i < cacheBuffers.length; ++i) { - releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]); + public void releaseBuffers(List cacheBuffers) { + for (LlapMemoryBuffer b : cacheBuffers) { + releaseBufferInternal((LlapCacheableBuffer)b); } } @@ -399,4 +398,9 @@ public class LowLevelCacheImpl implement } } } + + @Override + public LlapMemoryBuffer createUnallocated() { + return new LlapCacheableBuffer(); + } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Mon Feb 2 22:28:15 2015 @@ -165,7 +165,6 @@ public class LowLevelLrfuCachePolicy ext continue; } // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us. - // TODO#: double check this is valid! nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; evicted += nextCandidate.byteBuffer.remaining(); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Mon Feb 2 22:28:15 2015 @@ -18,16 +18,16 @@ package org.apache.hadoop.hive.llap.cache; -import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; public class NoopCache implements Cache { @Override - public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) { + public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value) { return value; } @Override - public ColumnBuffer get(CacheKey key) { + public StreamBuffer get(CacheKey key) { return null; // TODO: ensure real implementation increases refcount } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Mon Feb 2 22:28:15 2015 @@ -68,8 +68,9 @@ public class LlapInputFormat if (includedCols.isEmpty()) { includedCols = null; // Also means read all columns? WTF? } - VectorReader reader = llapIo.getReader( - fileSplit, includedCols, SearchArgumentFactory.createFromConf(job)); + VectorReader reader = llapIo.getReader(fileSplit, includedCols, + SearchArgumentFactory.createFromConf(job), + ColumnProjectionUtils.getReadColumnNames(job)); return new LlapRecordReader(reader, job, fileSplit); } catch (Exception ex) { throw new IOException(ex); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Mon Feb 2 22:28:15 2015 @@ -70,8 +70,9 @@ public class LlapIoImpl implements LlapI this.cvp = new OrcColumnVectorProducer(threadPool, edp, conf); } - VectorReader getReader(InputSplit split, List columnIds, SearchArgument sarg) { - return new VectorReader(split, columnIds, sarg, cvp); + VectorReader getReader(InputSplit split, + List columnIds, SearchArgument sarg, String[] columnNames) { + return new VectorReader(split, columnIds, sarg, columnNames, cvp); } @Override Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java Mon Feb 2 22:28:15 2015 @@ -36,6 +36,7 @@ public class VectorReader implements Con private final InputSplit split; private final List columnIds; private final SearchArgument sarg; + private final String[] columnNames; private final ColumnVectorProducer cvp; private final LinkedList pendingData = new LinkedList(); @@ -45,10 +46,11 @@ public class VectorReader implements Con private ConsumerFeedback feedback; public VectorReader(InputSplit split, List columnIds, SearchArgument sarg, - ColumnVectorProducer cvp) { + String[] columnNames, ColumnVectorProducer cvp) { this.split = split; this.columnIds = columnIds; this.sarg = sarg; + this.columnNames = columnNames; this.cvp = cvp; } @@ -56,7 +58,7 @@ public class VectorReader implements Con // TODO: if some collection is needed, return previous ColumnVectorBatch here ColumnVectorBatch current = null; if (feedback == null) { - feedback = cvp.read(split, columnIds, sarg, this); + feedback = cvp.read(split, columnIds, sarg, columnNames, this); } if (isClosed) { throw new AssertionError("next called after close"); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Mon Feb 2 22:28:15 2015 @@ -27,7 +27,7 @@ import java.util.concurrent.ExecutorServ import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader; @@ -42,12 +42,13 @@ public abstract class ColumnVectorProduc this.executor = executor; } + // TODO#: Given how ORC reads data, it should return this and not separate columns. static class EncodedColumnBatch { public EncodedColumnBatch(int colCount) { - columnDatas = new ColumnBuffer[colCount]; + columnDatas = new StreamBuffer[colCount][]; columnsRemaining = colCount; } - public ColumnBuffer[] columnDatas; + public StreamBuffer[][] columnDatas; public int columnsRemaining; } @@ -57,7 +58,7 @@ public abstract class ColumnVectorProduc // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb. private final HashMap pendingData = new HashMap(); - private ConsumerFeedback upstreamFeedback; + private ConsumerFeedback upstreamFeedback; private final Consumer downstreamConsumer; private final int colCount; @@ -66,7 +67,7 @@ public abstract class ColumnVectorProduc this.colCount = colCount; } - public void init(ConsumerFeedback upstreamFeedback) { + public void init(ConsumerFeedback upstreamFeedback) { this.upstreamFeedback = upstreamFeedback; } @@ -85,7 +86,7 @@ public abstract class ColumnVectorProduc } } if (localIsStopped) { - upstreamFeedback.returnData(data.columnData); + returnProcessed(data.streamData); return; } @@ -94,7 +95,7 @@ public abstract class ColumnVectorProduc // Check if we are stopped and the batch was already cleaned. localIsStopped = (targetBatch.columnDatas == null); if (!localIsStopped) { - targetBatch.columnDatas[data.columnIndex] = data.columnData; + targetBatch.columnDatas[data.columnIndex] = data.streamData; colsRemaining = --targetBatch.columnsRemaining; if (0 == colsRemaining) { synchronized (pendingData) { @@ -107,14 +108,22 @@ public abstract class ColumnVectorProduc } } if (localIsStopped) { - upstreamFeedback.returnData(data.columnData); + returnProcessed(data.streamData); return; } if (0 == colsRemaining) { ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer); // Batch has been decoded; unlock the buffers in cache - for (ColumnBuffer cb : targetBatch.columnDatas) { - upstreamFeedback.returnData(cb); + for (StreamBuffer[] columnData : targetBatch.columnDatas) { + returnProcessed(columnData); + } + } + } + + private void returnProcessed(StreamBuffer[] data) { + for (StreamBuffer sb : data) { + if (sb.decRef() == 0) { + upstreamFeedback.returnData(sb); } } } @@ -142,7 +151,7 @@ public abstract class ColumnVectorProduc } private void dicardPendingData(boolean isStopped) { - List dataToDiscard = new ArrayList(pendingData.size() * colCount); + List dataToDiscard = new ArrayList(pendingData.size() * colCount); List batches = new ArrayList(pendingData.size()); synchronized (pendingData) { if (isStopped) { @@ -153,14 +162,18 @@ public abstract class ColumnVectorProduc } for (EncodedColumnBatch batch : batches) { synchronized (batch) { - for (ColumnBuffer b : batch.columnDatas) { - dataToDiscard.add(b); + for (StreamBuffer[] bb : batch.columnDatas) { + for (StreamBuffer b : bb) { + dataToDiscard.add(b); + } } batch.columnDatas = null; } } - for (ColumnBuffer data : dataToDiscard) { - upstreamFeedback.returnData(data); + for (StreamBuffer data : dataToDiscard) { + if (data.decRef() == 0) { + upstreamFeedback.returnData(data); + } } } @@ -191,13 +204,21 @@ public abstract class ColumnVectorProduc * @throws IOException */ public ConsumerFeedback read(InputSplit split, List columnIds, - SearchArgument sarg, Consumer consumer) throws IOException { + SearchArgument sarg, String[] columnNames, Consumer consumer) + throws IOException { // Create the consumer of encoded data; it will coordinate decoding to CVBs. EncodedDataConsumer edc = new EncodedDataConsumer(consumer, columnIds.size()); // Get the source of encoded data. EncodedDataProducer edp = getEncodedDataProducer(); // Then, get the specific reader of encoded data out of the producer. - EncodedDataReader reader = edp.getReader(split, columnIds, sarg, edc); + /* +[ERROR] reason: actual argument +org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.EncodedDataConsumer +cannot be converted to +org.apache.hadoop.hive.llap.Consumer< + org.apache.hadoop.hive.llap.io.api.EncodedColumn< + org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey>> by method invocation conversion * */ + EncodedDataReader reader = edp.getReader(split, columnIds, sarg, columnNames, edc); // Set the encoded data reader as upstream feedback for encoded data consumer, and start. edc.init(reader); executor.submit(reader); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java Mon Feb 2 22:28:15 2015 @@ -22,10 +22,11 @@ import java.util.List; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; +import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.InputSplit; public interface EncodedDataProducer { - public EncodedDataReader getReader(InputSplit split, List columnIds, - SearchArgument sarg, Consumer> consumer); + EncodedDataReader getReader(InputSplit split, List columnIds, + SearchArgument sarg, String[] columnNames, Consumer> consumer); } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Mon Feb 2 22:28:15 2015 @@ -21,7 +21,7 @@ package org.apache.hadoop.hive.llap.io.e import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.ConsumerFeedback; -import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; /** * Interface for encoded data readers to implement. @@ -29,5 +29,5 @@ import org.apache.hadoop.hive.llap.io.ap * The final threading design will probably change. */ public interface EncodedDataReader - extends ConsumerFeedback, Callable { + extends ConsumerFeedback, Callable { } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Mon Feb 2 22:28:15 2015 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.io.e import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -30,15 +31,18 @@ import org.apache.hadoop.hive.llap.Consu import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.cache.Cache; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; -import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey; +import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.io.orc.StripeInformation; @@ -49,7 +53,7 @@ import org.apache.hadoop.mapred.InputSpl public class OrcEncodedDataProducer implements EncodedDataProducer { private FileSystem cachedFs = null; private Configuration conf; - private OrcMetadataCache metadataCache; + private final OrcMetadataCache metadataCache; // TODO: it makes zero sense to have both at the same time and duplicate data. Add "cache mode". private final Cache cache; private final LowLevelCache lowLevelCache; @@ -59,24 +63,23 @@ public class OrcEncodedDataProducer impl private final FileSplit split; private List columnIds; private final SearchArgument sarg; + private final String[] columnNames; private final Consumer> consumer; // Read state. - private int stripeIxFrom, stripeIxTo; + private int stripeIxFrom; private Reader orcReader; private final String internedFilePath; /** - * readState[stripeIx'][colIx'] - bitmask (as long array) of rg-s that are done. - * Bitmasks are all well-known size so we don't bother with BitSets and such. - * Each long has natural bit indexes used, so rightmost bits are filled first. + * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be + * read. Contains only stripes that are read, and only columns included. null => read all RGs. */ - private long[][][] readState; - private int[] rgsPerStripe = null; + private boolean[][][] readState; private boolean isStopped = false, isPaused = false; public OrcEncodedDataReader(InputSplit split, List columnIds, - SearchArgument sarg, Consumer> consumer) { + SearchArgument sarg, String[] columnNames, Consumer> consumer) { this.split = (FileSplit)split; this.internedFilePath = this.split.getPath().toString().intern(); this.columnIds = columnIds; @@ -84,6 +87,7 @@ public class OrcEncodedDataProducer impl Collections.sort(this.columnIds); } this.sarg = sarg; + this.columnNames = columnNames; this.consumer = consumer; } @@ -109,57 +113,128 @@ public class OrcEncodedDataProducer impl public Void call() throws IOException { LlapIoImpl.LOG.info("Processing split for " + internedFilePath); if (isStopped) return null; - List stripes = metadataCache.getStripes(internedFilePath); - List types = metadataCache.getTypes(internedFilePath); orcReader = null; - if (stripes == null || types == null) { + // Get FILE metadata from cache, or create the reader and read it. + OrcFileMetadata metadata = metadataCache.getFileMetadata(internedFilePath); + if (metadata == null) { orcReader = createOrcReader(split); - stripes = metadataCache.getStripes(internedFilePath); - types = metadataCache.getTypes(internedFilePath); + metadata = new OrcFileMetadata(orcReader); + metadataCache.putFileMetadata(internedFilePath, metadata); } if (columnIds == null) { - columnIds = new ArrayList(types.size()); - for (int i = 1; i < types.size(); ++i) { + columnIds = new ArrayList(metadata.getTypes().size()); + for (int i = 1; i < metadata.getTypes().size(); ++i) { columnIds.add(i); } } - determineWhatToRead(stripes); + // Then, determine which stripes to read based on the split. + determineStripesToRead(metadata.getStripes()); + if (readState.length == 0) { + consumer.setDone(); + return null; // No data to read. + } + int stride = metadata.getRowIndexStride(); + ArrayList stripesMetadata = null; + boolean[] globalIncludes = OrcInputFormat.genIncludedColumns( + metadata.getTypes(), columnIds, true); + RecordReader[] stripeReaders = new RecordReader[readState.length]; + if (sarg != null && stride != 0) { + // If SARG is present, get relevant stripe metadata from cache or readers. + stripesMetadata = readStripesMetadata(metadata, globalIncludes, stripeReaders); + } + + // Now, apply SARG if any; w/o sarg, this will just initialize readState. + determineRgsToRead(metadata.getStripes(), metadata.getTypes(), + globalIncludes, stride, stripesMetadata); if (isStopped) return null; - List[] stripeColsToRead = produceDataFromCache(); - // readState now contains some 1s for column x rgs that were fetched from cache. - // TODO: I/O threadpool would be here (or below); for now, linear - for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + // Get data from high-level cache; if some cols are fully in cache, this will also + // give us the modified list of columns to read for every stripe (null means all). + List[] stripeColsToRead = produceDataFromCache(metadata.getStripes(), stride); + // readState has been modified for column x rgs that were fetched from cache. + + // Then, create the readers for each stripe and prepare to read. + for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) { + List colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod]; + RecordReader stripeReader = stripeReaders[stripeIxMod]; + if (colsToRead == null) { + colsToRead = columnIds; + } else if (colsToRead.isEmpty()) { + if (stripeReader != null) { + stripeReader.close(); + stripeReaders[stripeIxMod] = null; + } + continue; // All the data for this stripe was in cache. + } else if (stripeReader != null) { + // We have created the reader to read stripe metadata with all includes. + // We will now recreate the reader with narrower included columns (due to cache). + stripeReader.close(); + stripeReader = null; + } + + if (stripeReader != null) continue; // We already have a reader. + // Create RecordReader that will be used to read only this stripe. + StripeInformation si = metadata.getStripes().get(stripeIxFrom + stripeIxMod); + boolean[] stripeIncludes = OrcInputFormat.genIncludedColumns( + metadata.getTypes(), colsToRead, true); + if (orcReader == null) { + orcReader = createOrcReader(split); + } + stripeReader = orcReader.rows(si.getOffset(), si.getLength(), stripeIncludes); + stripeReader.prepareEncodedColumnRead(); + stripeReaders[stripeIxMod] = stripeReader; + } + + // We now have one reader per stripe that needs to be read. Read. + // TODO: I/O threadpool would be here - one thread per stripe; for now, linear. + OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, -1, 0); + for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) { + RecordReader stripeReader = stripeReaders[stripeIxMod]; + if (stripeReader == null) continue; // No need to read this stripe, see above. List colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod]; - long[][] colRgs = readState[stripeIxMod]; if (colsToRead == null) { colsToRead = columnIds; } - if (colsToRead.isEmpty()) continue; // All the data for this stripe was in cache. - if (colsToRead.size() != colRgs.length) { + boolean[][] colRgs = readState[stripeIxMod]; + if (colsToRead != null && colsToRead.size() != colRgs.length) { // We are reading subset of the original columns, remove unnecessary bitmasks. - long[][] colRgs2 = new long[colsToRead.size()][]; + boolean[][] colRgs2 = new boolean[colsToRead.size()][]; for (int i = 0, i2 = -1; i < colRgs.length; ++i) { if (colRgs[i] == null) continue; colRgs2[++i2] = colRgs[i]; } colRgs = colRgs2; } - int stripeIx = stripeIxFrom + stripeIxMod; - StripeInformation si = stripes.get(stripeIx); - int rgCount = rgsPerStripe[stripeIxMod]; - boolean[] includes = OrcInputFormat.genIncludedColumns(types, colsToRead, true); - if (orcReader == null) { - orcReader = createOrcReader(split); + + // Get stripe metadata. We might have read it earlier for RG filtering. + OrcStripeMetadata stripeMetadata; + int stripeIx = stripeIxMod + stripeIxFrom; + if (stripesMetadata != null) { + stripeMetadata = stripesMetadata.get(stripeIxMod); + } else { + stripeKey.stripeIx = stripeIx; + stripeMetadata = metadataCache.getStripeMetadata(stripeKey); + if (stripeMetadata == null) { + stripeMetadata = new OrcStripeMetadata(stripeReader, stripeKey.stripeIx); + metadataCache.putStripeMetadata(stripeKey, stripeMetadata); + stripeKey = new OrcBatchKey(internedFilePath, -1, 0); + } } - RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes); + stripeReader.setRowIndex(stripeMetadata.getRowIndexes()); + // In case if we have high-level cache, we will intercept the data and add it there; // otherwise just pass the data directly to the consumer. Consumer> consumer = (cache == null) ? this.consumer : this; - stripeReader.readEncodedColumns(colRgs, rgCount, consumer, lowLevelCache); + // This is where I/O happens. This is a sync call that will feed data to the consumer. + try { + stripeReader.readEncodedColumns(stripeIx, colRgs, lowLevelCache, consumer); + } catch (Throwable t) { + consumer.setError(t); + } stripeReader.close(); } + // Done with all the things. consumer.setDone(); if (DebugUtils.isTraceMttEnabled()) { LlapIoImpl.LOG.info("done processing " + split); @@ -167,16 +242,72 @@ public class OrcEncodedDataProducer impl return null; } + private ArrayList readStripesMetadata(OrcFileMetadata metadata, + boolean[] globalInc, RecordReader[] stripeReaders) throws IOException { + ArrayList result = new ArrayList(stripeReaders.length); + OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, 0, 0); + for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) { + stripeKey.stripeIx = stripeIxMod + stripeIxFrom; + OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey); + if (value == null) { + // Metadata not present in cache - get it from the reader and put in cache. + if (orcReader == null) { + orcReader = createOrcReader(split); + } + StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx); + stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(), si.getLength(), globalInc); + stripeReaders[stripeIxMod].prepareEncodedColumnRead(); + value = new OrcStripeMetadata(stripeReaders[stripeIxMod], stripeKey.stripeIx); + metadataCache.putStripeMetadata(stripeKey, value); + // Create new key object to reuse for gets; we've used the old one to put in cache. + stripeKey = new OrcBatchKey(internedFilePath, 0, 0); + } + result.add(value); + } + return result; + } + @Override - public void returnData(ColumnBuffer data) { + public void returnData(StreamBuffer data) { lowLevelCache.releaseBuffers(data.cacheBuffers); } - private void determineWhatToRead(List stripes) { - // The unit of caching for ORC is (stripe x column) (see OrcBatchKey). + private void determineRgsToRead(List stripes, List types, + boolean[] globalIncludes, int rowIndexStride, ArrayList metadata) + throws IOException { + SargApplier sargApp = null; + if (sarg != null) { + String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( + columnNames, types, globalIncludes, OrcInputFormat.isOriginal(orcReader)); + sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride); + } + // readState should have been initialized by this time with an empty array. + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + int originalStripeIx = stripeIxMod + stripeIxFrom; + StripeInformation stripe = stripes.get(originalStripeIx); + int rgCount = getRgCount(stripe, rowIndexStride); + boolean[] rgsToRead = null; + if (sargApp != null) { + rgsToRead = sargApp.pickRowGroups(stripe, metadata.get(stripeIxMod).getRowIndexes()); + } + assert rgsToRead == null || rgsToRead.length == rgCount; + readState[stripeIxMod] = new boolean[columnIds.size()][]; + for (int j = 0; j < columnIds.size(); ++j) { + readState[stripeIxMod][j] = (rgsToRead == null) ? null : + Arrays.copyOf(rgsToRead, rgsToRead.length); + } + } + } + + private int getRgCount(StripeInformation stripe, int rowIndexStride) { + return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); + } + + public void determineStripesToRead(List stripes) { + // The unit of caching for ORC is (rg x column) (see OrcBatchKey). long offset = split.getStart(), maxOffset = offset + split.getLength(); - stripeIxFrom = stripeIxTo = -1; - int stripeIx = 0; + stripeIxFrom = -1; + int stripeIxTo = -1; if (LlapIoImpl.LOG.isDebugEnabled()) { String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes "; for (StripeInformation stripe : stripes) { @@ -185,7 +316,7 @@ public class OrcEncodedDataProducer impl LlapIoImpl.LOG.debug(tmp); } - List stripeRgCounts = new ArrayList(stripes.size()); + int stripeIx = 0; for (StripeInformation stripe : stripes) { long stripeStart = stripe.getOffset(); if (offset > stripeStart) continue; @@ -204,9 +335,6 @@ public class OrcEncodedDataProducer impl stripeIxTo = stripeIx; break; } - int rgCount = (int)Math.ceil( - (double)stripe.getNumberOfRows() / orcReader.getRowIndexStride()); - stripeRgCounts.add(rgCount); ++stripeIx; } if (stripeIxTo == -1) { @@ -215,54 +343,51 @@ public class OrcEncodedDataProducer impl } stripeIxTo = stripeIx; } - readState = new long[stripeRgCounts.size()][][]; - for (int i = 0; i < stripeRgCounts.size(); ++i) { - int bitmaskSize = align64(stripeRgCounts.get(i)) >>> 6; - readState[i] = new long[columnIds.size()][]; - for (int j = 0; j < columnIds.size(); ++j) { - readState[i][j] = new long[bitmaskSize]; - } - } - // TODO: HERE, we need to apply sargs and mark RGs that are filtered as 1s - rgsPerStripe = new int[stripeRgCounts.size()]; - for (int i = 0; i < rgsPerStripe.length; ++i) { - rgsPerStripe[i] = stripeRgCounts.get(i); - } + readState = new boolean[stripeIxTo - stripeIxFrom][][]; } // TODO: split by stripe? we do everything by stripe, and it might be faster - private List[] produceDataFromCache() { + private List[] produceDataFromCache( + List stripes, int rowIndexStride) { if (cache == null) return null; OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1); + // For each stripe, keep a list of columns that are not fully in cache (null => all of them). @SuppressWarnings("unchecked") // No generics arrays - "J" in "Java" stands for "joke". List[] stripeColsNotInCache = new List[readState.length]; for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { key.stripeIx = stripeIxFrom + stripeIxMod; - long[][] cols = readState[stripeIxMod]; - int rgCount = rgsPerStripe[stripeIxMod]; + boolean[][] cols = readState[stripeIxMod]; + // TODO## at self-CR, see that colIx business here was not screwed up for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { + boolean[] readMask = cols[colIxMod]; key.colIx = columnIds.get(colIxMod); - long[] doneMask = cols[colIxMod]; + // Assume first all RGs will be in cache; calculate or get the RG count. boolean areAllRgsInCache = true; + int rgCount = readMask != null ? readMask.length + : getRgCount(stripes.get(key.stripeIx), rowIndexStride); for (int rgIx = 0; rgIx < rgCount; ++rgIx) { - int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63); - if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG eliminated by SARG + if (readMask != null && !readMask[rgIx]) continue; // RG eliminated by SARG key.rgIx = rgIx; - ColumnBuffer cached = cache.get(key); + StreamBuffer cached = cache.get(key); if (cached == null) { areAllRgsInCache = false; continue; } + // RG was in cache; send it over to the consumer. // TODO: pool of EncodedColumn-s objects. Someone will need to return them though. - EncodedColumn col = new EncodedColumn( - key.copyToPureBatchKey(), key.colIx, cached); + EncodedColumn col = null; + // TODO# new EncodedColumn(key.copyToPureBatchKey(), key.colIx, cached); consumer.consumeData(col); - doneMask[maskIndex] = doneMask[maskIndex] | maskBit; + if (readMask == null) { + // We were going to read all RGs, but now that some were in cache, allocate the mask. + cols[colIxMod] = readMask = new boolean[rgCount]; + Arrays.fill(readMask, true); + } + readMask[rgIx] = false; // Got from cache, don't read from disk. } - boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null; + boolean hasExplicitColList = stripeColsNotInCache[stripeIxMod] != null; if (areAllRgsInCache) { - cols[colIxMod] = null; // No need for bitmask, all rgs are done. - if (!hasFetchList) { + if (!hasExplicitColList) { // All rgs for this stripe x column were fetched from cache. If this is the first // such column, create custom, smaller list of columns to fetch later for this // stripe (default is all the columns originally requested). Add all previous @@ -272,7 +397,7 @@ public class OrcEncodedDataProducer impl stripeColsNotInCache[stripeIxMod].addAll(columnIds.subList(0, colIxMod)); } } - } else if (hasFetchList) { + } else if (hasExplicitColList) { // Only a subset of original columnIds need to be fetched for this stripe; // add the current one to this sublist. stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod)); @@ -292,11 +417,14 @@ public class OrcEncodedDataProducer impl // Store object in cache; create new key object - cannot be reused. assert cache != null; OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex); - ColumnBuffer cached = cache.cacheOrGet(key, data.columnData); - if (data.columnData != cached) { + // TODO#: change type of cache and restore this + /* + StreamBuffer cached = cache.cacheOrGet(key, data.columnData); + if (data.streamData != cached) { lowLevelCache.releaseBuffers(data.columnData.cacheBuffers); data.columnData = cached; } + */ consumer.consumeData(data); } @@ -312,16 +440,9 @@ public class OrcEncodedDataProducer impl if ("pfile".equals(path.toUri().getScheme())) { fs = path.getFileSystem(conf); // Cannot use cached FS due to hive tests' proxy FS. } - if (metadataCache == null) { - metadataCache = new OrcMetadataCache(cachedFs, path, conf); - } return OrcFile.createReader(path, OrcFile.readerOptions(conf).filesystem(fs)); } - private static int align64(int number) { - return ((number + 63) & ~63); - } - public OrcEncodedDataProducer(LowLevelCache lowLevelCache, Cache cache, Configuration conf) throws IOException { // We assume all splits will come from the same FS. @@ -329,12 +450,12 @@ public class OrcEncodedDataProducer impl this.cache = cache; this.lowLevelCache = lowLevelCache; this.conf = conf; - this.metadataCache = null; + this.metadataCache = new OrcMetadataCache(); } @Override public EncodedDataReader getReader(InputSplit split, List columnIds, - SearchArgument sarg, Consumer> consumer) { - return new OrcEncodedDataReader(split, columnIds, sarg, consumer); + SearchArgument sarg, String[] columnNames, Consumer> consumer) { + return new OrcEncodedDataReader(split, columnIds, sarg, columnNames, consumer); } } Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java?rev=1656595&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java Mon Feb 2 22:28:15 2015 @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.metadata; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.io.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.orc.OrcProto; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; + +public class OrcFileMetadata { + private CompressionKind compressionKind; + private int compressionBufferSize; + private List types; + private List stripes; + private int rowIndexStride; + + public OrcFileMetadata(Reader reader) { + setCompressionKind(reader.getCompression()); + setCompressionBufferSize(reader.getCompressionSize()); + setStripes(reader.getStripes()); + setTypes(reader.getTypes()); + setRowIndexStride(reader.getRowIndexStride()); + } + + public List getStripes() { + return stripes; + } + + public void setStripes(List stripes) { + this.stripes = stripes; + } + + public CompressionKind getCompressionKind() { + return compressionKind; + } + + public void setCompressionKind(CompressionKind compressionKind) { + this.compressionKind = compressionKind; + } + + public int getCompressionBufferSize() { + return compressionBufferSize; + } + + public void setCompressionBufferSize(int compressionBufferSize) { + this.compressionBufferSize = compressionBufferSize; + } + + public List getTypes() { + return types; + } + + public void setTypes(List types) { + this.types = types; + } + + public int getRowIndexStride() { + return rowIndexStride; + } + + public void setRowIndexStride(int rowIndexStride) { + this.rowIndexStride = rowIndexStride; + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java Mon Feb 2 22:28:15 2015 @@ -19,79 +19,48 @@ package org.apache.hadoop.hive.llap.io.metadata; import java.io.IOException; -import java.util.List; import java.util.concurrent.ExecutionException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.orc.CompressionKind; -import org.apache.hadoop.hive.ql.io.orc.OrcProto; -import org.apache.hadoop.hive.ql.io.orc.StripeInformation; +import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; /** * ORC-specific metadata cache. + * TODO: should be merged with main cache somehow if we find this takes too much memory */ public class OrcMetadataCache { private static final int DEFAULT_CACHE_ACCESS_CONCURRENCY = 10; - private static final int DEFAULT_MAX_CACHE_ENTRIES = 100; - private static Cache METADATA; + private static final int DEFAULT_MAX_FILE_ENTRIES = 1000; + private static final int DEFAULT_MAX_STRIPE_ENTRIES = 10000; + private static Cache METADATA; + private static Cache STRIPE_METADATA; static { METADATA = CacheBuilder.newBuilder() .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY) - .maximumSize(DEFAULT_MAX_CACHE_ENTRIES) + .maximumSize(DEFAULT_MAX_FILE_ENTRIES) + .build(); + STRIPE_METADATA = CacheBuilder.newBuilder() + .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY) + .maximumSize(DEFAULT_MAX_STRIPE_ENTRIES) .build(); - } - - private Path path; - private OrcMetadataLoader loader; - - public OrcMetadataCache(FileSystem fs, Path path, Configuration conf) { - this.path = path; - this.loader = new OrcMetadataLoader(fs, path, conf); - } - - public CompressionKind getCompression(String pathString) throws IOException { - try { - return METADATA.get(pathString, loader).getCompressionKind(); - } catch (ExecutionException e) { - throw new IOException("Unable to load orc metadata for " + path.toString(), e); } - } - public int getCompressionBufferSize(String pathString) throws IOException { - try { - return METADATA.get(pathString, loader).getCompressionBufferSize(); - } catch (ExecutionException e) { - throw new IOException("Unable to load orc metadata for " + path.toString(), e); - } + public void putFileMetadata(String filePath, OrcFileMetadata metaData) { + METADATA.put(filePath, metaData); } - public List getTypes(String pathString) throws IOException { - try { - return METADATA.get(pathString, loader).getTypes(); - } catch (ExecutionException e) { - throw new IOException("Unable to load orc metadata for " + path.toString(), e); - } + public void putStripeMetadata(OrcBatchKey stripeKey, OrcStripeMetadata metaData) { + STRIPE_METADATA.put(stripeKey, metaData); } - public List getStripes(String pathString) throws IOException { - try { - return METADATA.get(pathString, loader).getStripes(); - } catch (ExecutionException e) { - throw new IOException("Unable to load orc metadata for " + path.toString(), e); - } + public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException { + return STRIPE_METADATA.getIfPresent(stripeKey); } - // public boolean[] getIncludedRowGroups(String pathString, SearchArgument sarg, int stripeIdx) throws IOException { - // try { - // return METADATA.get(pathString, loader).getStripeToRowIndexEntries(); - // } catch (ExecutionException e) { - // throw new IOException("Unable to load orc metadata for " + path.toString(), e); - // } - // } + public OrcFileMetadata getFileMetadata(String pathString) throws IOException { + return METADATA.getIfPresent(pathString); + } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java Mon Feb 2 22:28:15 2015 @@ -18,50 +18,21 @@ package org.apache.hadoop.hive.llap.io.metadata; -import static org.apache.hadoop.hive.ql.io.orc.OrcFile.readerOptions; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.llap.io.orc.OrcFile; import org.apache.hadoop.hive.llap.io.orc.Reader; -import org.apache.hadoop.hive.llap.io.orc.RecordReader; -import org.apache.hadoop.hive.ql.io.orc.OrcProto; -import org.apache.hadoop.hive.ql.io.orc.StripeInformation; - -public class OrcMetadataLoader implements Callable { - private FileSystem fs; - private Path path; - private Configuration conf; - - public OrcMetadataLoader(FileSystem fs, Path path, Configuration conf) { - this.fs = fs; - this.path = path; - this.conf = conf; + +// TODO: this class is pointless +public class OrcMetadataLoader implements Callable { + private Reader reader; + + public OrcMetadataLoader(Reader reader) { + this.reader = reader; } @Override - public OrcMetadata call() throws Exception { - Reader reader = OrcFile.createLLAPReader(path, readerOptions(conf).filesystem(fs)); - OrcMetadata orcMetadata = new OrcMetadata(); - orcMetadata.setCompressionKind(reader.getCompression()); - orcMetadata.setCompressionBufferSize(reader.getCompressionSize()); - List stripes = reader.getStripes(); - orcMetadata.setStripes(stripes); - Map> stripeColEnc = new HashMap>(); - Map stripeRowIndices = new HashMap(); - RecordReader rows = reader.rows(); - for (int i = 0; i < stripes.size(); i++) { - stripeColEnc.put(i, rows.getColumnEncodings(i)); - stripeRowIndices.put(i, rows.getRowIndexEntries(i)); - } - orcMetadata.setStripeToColEncodings(stripeColEnc); - orcMetadata.setStripeToRowIndexEntries(stripeRowIndices); - return orcMetadata; + public OrcFileMetadata call() throws Exception { + return new OrcFileMetadata(reader); } } Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1656595&view=auto ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (added) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Mon Feb 2 22:28:15 2015 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.metadata; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.io.orc.OrcProto; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; + +public class OrcStripeMetadata { + // TODO#: add encoding and stream list + OrcProto.RowIndex[] rowIndexes; + + public OrcStripeMetadata(RecordReader reader, int stripeIx) throws IOException { + rowIndexes = reader.getCurrentRowIndexEntries(); + } + + public OrcProto.RowIndex[] getRowIndexes() { + return rowIndexes; + } + + public void setRowIndexes(OrcProto.RowIndex[] rowIndexes) { + this.rowIndexes = rowIndexes; + } +} Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Mon Feb 2 22:28:15 2015 @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.io.ap import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.orc.*; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -45,24 +47,6 @@ public class LLAPRecordReaderImpl extend } @Override - public OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws IOException { - return readRowIndex(stripeIdx); - } - - @Override - public List getColumnEncodings(int stripeIdx) throws IOException { - StripeInformation si = stripes.get(stripeIdx); - OrcProto.StripeFooter sf = readStripeFooter(si); - return sf.getColumnsList(); - } - - @Override - public boolean[] getIncludedRowGroups(int stripeIdx) throws IOException { - currentStripe = stripeIdx; - return pickRowGroups(); - } - - @Override public boolean hasNext() throws IOException { return false; } @@ -96,10 +80,4 @@ public class LLAPRecordReaderImpl extend public void seekToRow(long rowCount) throws IOException { } - - @Override - public void readEncodedColumns(long[][] colRgs, int rgCount, - Consumer> consumer, LowLevelCache cache) { - throw new UnsupportedOperationException("not implemented"); - } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java Mon Feb 2 22:28:15 2015 @@ -26,27 +26,4 @@ import org.apache.hadoop.hive.ql.io.orc. * */ public interface RecordReader extends org.apache.hadoop.hive.ql.io.orc.RecordReader { - /** - * Return all row index entries for the specified stripe index. - * - * @param stripeIdx - stripe index within orc file - * @return - all row index entries - */ - OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws IOException; - - /** - * Return column encodings of all columns for the specified stripe index. - * - * @param stripeIdx - stripe index within orc file - * @return - column encodings of all columns - */ - List getColumnEncodings(int stripeIdx) throws IOException; - - /** - * Return the row groups that satisfy the SARG condition for the specified stripe index. - * - * @param stripeIdx - stripe index within orc file - * @return - row groups qualifying the SARG - */ - boolean[] getIncludedRowGroups(int stripeIdx) throws IOException; } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Mon Feb 2 22:28:15 2015 @@ -21,17 +21,21 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; +import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import com.google.common.annotations.VisibleForTesting; +import com.sun.tools.javac.code.Attribute.Array; abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); @@ -125,6 +129,15 @@ abstract class InStream extends InputStr } } + private static ByteBuffer allocateBuffer(int size, boolean isDirect) { + // TODO: use the same pool as the ORC readers + if (isDirect) { + return ByteBuffer.allocateDirect(size); + } else { + return ByteBuffer.allocate(size); + } + } + private static class CompressedStream extends InStream { private final String fileName; private final String name; @@ -154,15 +167,6 @@ abstract class InStream extends InputStr this.cache = cache; } - private ByteBuffer allocateBuffer(int size, boolean isDirect) { - // TODO: use the same pool as the ORC readers - if (isDirect) { - return ByteBuffer.allocateDirect(size); - } else { - return ByteBuffer.allocate(size); - } - } - // TODO: This should not be used for main path. private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1]; private void allocateForUncompressed(int size, boolean isDirect) { @@ -295,7 +299,7 @@ abstract class InStream extends InputStr } /* slices a read only contiguous buffer of chunkLength */ - private ByteBuffer slice(int chunkLength) throws IOException { + private ByteBuffer slice(int chunkLength) throws IOException { int len = chunkLength; final long oldOffset = currentOffset; ByteBuffer slice; @@ -478,4 +482,204 @@ abstract class InStream extends InputStr return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache); } } + + private static class ProcCacheChunk extends CacheChunk { + public ProcCacheChunk(long cbStartOffset, long cbEndOffset, + boolean isCompressed, ByteBuffer originalData, LlapMemoryBuffer targetBuffer) { + super(targetBuffer, cbStartOffset, cbEndOffset); + this.isCompressed = isCompressed; + this.originalData = originalData; + } + + boolean isCompressed; + ByteBuffer originalData = null; + } + + /** + * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress + * and remove what we have returned. We will keep iterator as a "hint" point. + * TODO: Java LinkedList and iter have a really stupid interface. Replace with own simple one? + */ + public static void uncompressStream(String fileName, + ListIterator ranges, + CompressionCodec codec, int bufferSize, LowLevelCache cache, + long cOffset, long endCOffset, StreamBuffer colBuffer) + throws IOException { + // TODO#: accpount for coffsets being -1 after finishing the normal methods. + colBuffer.cacheBuffers = new ArrayList(); + List toDecompress = new ArrayList(); + + // Find our bearings in the stream. Normally, iter will already point either to where we + // want to be, or just before. However, RGs can overlap due to encoding, so we may have + // to return to a previous block. + DiskRange current = findCompressedPosition(ranges, cOffset); + + // Go thru the blocks; add stuff to results and prepare the decompression work (see below). + int nextCbOffset = (cOffset >= 0) ? (int)(cOffset - current.offset) : -1; + long currentCOffset = cOffset; + while (true) { + if (current instanceof CacheChunk) { + // This is a cached compression buffer, add as is. + if (nextCbOffset > 0) throw new AssertionError("Compressed offset in the middle of cb"); + CacheChunk cc = (CacheChunk)current; + colBuffer.cacheBuffers.add(cc.buffer); + currentCOffset = cc.end; + } else { + // This is a compressed buffer. We need to uncompress it; the buffer can comprise + // several disk ranges, so we might need to combine them. + BufferChunk bc = (BufferChunk)current; + // TODO#: DOUBLE check the iterator state. + int chunkLength = addOneCompressionBuffer(bc, ranges, bufferSize, + cache, colBuffer.cacheBuffers, toDecompress, nextCbOffset); + currentCOffset = bc.offset + chunkLength; + } + nextCbOffset = -1; + if ((endCOffset >= 0 && currentCOffset >= endCOffset) || !ranges.hasNext()) { + break; + } + current = ranges.next(); + } + + // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache + // data and some unallocated membufs for decompression. toDecompress contains all the work we + // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter + // has also been adjusted to point to these buffers instead of compressed data for the ranges. + // Allocate the buffers, prepare cache kets. + LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()]; + DiskRange[] cacheKeys = new DiskRange[toDecompress.size()]; + int ix = 0; + for (ProcCacheChunk chunk : toDecompress) { + cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store DiskRange. + targetBuffers[ix] = chunk.buffer; + ++ix; + } + cache.allocateMultiple(targetBuffers, bufferSize); + + // Now decompress (or copy) the data into cache buffers. + for (ProcCacheChunk chunk : toDecompress) { + if (chunk.isCompressed) { + codec.decompress(chunk.originalData, chunk.buffer.byteBuffer); + } else { + chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed data to cache. + } + chunk.originalData = null; // TODO#: are we supposed to release this to zcr in some cases + } + + // Finally, put data to cache. + cache.putFileData(fileName, cacheKeys, targetBuffers); + } + + private static DiskRange findCompressedPosition( + ListIterator ranges, long cOffset) { + if (cOffset < 0) return ranges.next(); + DiskRange current = null; + boolean doCallNext = false; + if (ranges.hasNext()) { + current = ranges.next(); + } else if (ranges.hasPrevious()) { + current = ranges.previous(); + doCallNext = true; + } + // We expect the offset to be valid TODO: rather, validate + while (current.end <= cOffset) { + current = ranges.next(); + doCallNext = false; + } + while (current.offset > cOffset) { + current = ranges.previous(); + doCallNext = true; + } + if (doCallNext) { + // TODO: WTF? + ranges.next(); // We called previous, make sure next is the real next and not current. + } + return current; + } + + + private static int addOneCompressionBuffer(BufferChunk current, + ListIterator ranges, int bufferSize, + LowLevelCache cache, List cacheBuffers, + List toDecompress, int nextCbOffsetExpected) throws IOException { + // TODO#: HERE + ByteBuffer slice = null; + ByteBuffer compressed = current.chunk; + if (nextCbOffsetExpected >= 0 && nextCbOffsetExpected != compressed.position()) { + throw new AssertionError("We don't know what we are doing anymore"); + } + long cbStartOffset = current.offset + compressed.position(); + int b0 = compressed.get() & 0xff; + int b1 = compressed.get() & 0xff; + int b2 = compressed.get() & 0xff; + int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); + if (chunkLength > bufferSize) { + throw new IllegalArgumentException("Buffer size too small. size = " + + bufferSize + " needed = " + chunkLength); + } + boolean isUncompressed = ((b0 & 0x01) == 1); + if (compressed.remaining() >= chunkLength) { + // Simple case - CB fits entirely in the disk range. + slice = compressed.slice(); + slice.limit(chunkLength); + addOneCompressionBlockByteBuffer(slice, isUncompressed, ranges, cache, compressed, + cbStartOffset, chunkLength, toDecompress, cacheBuffers); + return chunkLength; + } + + // TODO: we could remove extra copy for isUncompressed case. + // We need to consolidate 2 or more buffers into one to decompress. + ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); + int remaining = chunkLength - compressed.remaining(); + copy.put(compressed); + ranges.remove(); + + while (ranges.hasNext()) { + DiskRange range = ranges.next(); + if (!(range instanceof BufferChunk)) { + throw new IOException("Trying to extend compressed block into uncompressed block"); + } + compressed = range.getData(); + if (compressed.remaining() >= remaining) { + slice = compressed.slice(); + slice.limit(remaining); + copy.put(slice); + addOneCompressionBlockByteBuffer(copy, isUncompressed, ranges, cache, compressed, + cbStartOffset, chunkLength, toDecompress, cacheBuffers); + return chunkLength; + } + remaining -= compressed.remaining(); + copy.put(compressed); + ranges.remove(); + } + throw new IOException("EOF in while trying to read " + + chunkLength + " bytes at " + cbStartOffset); + } + + private static void addOneCompressionBlockByteBuffer( + ByteBuffer data, boolean isUncompressed, + ListIterator ranges, LowLevelCache cache, + ByteBuffer compressed, long cbStartOffset, int chunkLength, + List toDecompress, List cacheBuffers) { + // Prepare future cache buffer. + LlapMemoryBuffer futureAlloc = cache.createUnallocated(); + // Add it to result in order we are processing. + cacheBuffers.add(futureAlloc); + // Add it to the list of work to decompress. + long cbEndOffset = cbStartOffset + chunkLength; + ProcCacheChunk cc = new ProcCacheChunk( + cbStartOffset, cbEndOffset, !isUncompressed, data, futureAlloc); + toDecompress.add(cc); + // Adjust the compression block position. + compressed.position(compressed.position() + chunkLength); + // Finally, put it in the ranges list for future use (if shared between RGs). + // Before anyone else accesses it, it would have been allocated and decompressed locally. + if (compressed.remaining() <= 0) { + ranges.set(cc); + } else { + ranges.previous(); + ranges.add(cc); + ranges.next(); // TODO: This is really stupid. + } + } + } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Feb 2 22:28:15 2015 @@ -219,14 +219,17 @@ public class OrcInputFormat implements long offset, long length ) throws IOException { Reader.Options options = new Reader.Options().range(offset, length); - boolean isOriginal = - !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME); + boolean isOriginal = isOriginal(file); List types = file.getTypes(); options.include(genIncludedColumns(types, conf, isOriginal)); setSearchArgument(options, types, conf, isOriginal); return file.rowsOptions(options); } + public static boolean isOriginal(Reader file) { + return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME); + } + /** * Recurse down into a type subtree turning on all of the sub-columns. * @param types the types of the file @@ -278,6 +281,21 @@ public class OrcInputFormat implements } } + public static String[] getSargColumnNames(String[] originalColumnNames, + List types, boolean[] includedColumns, boolean isOriginal) { + int rootColumn = getRootColumn(isOriginal); + String[] columnNames = new String[types.size() - rootColumn]; + int i = 0; + for(int columnId: types.get(rootColumn).getSubtypesList()) { + if (includedColumns == null || includedColumns[columnId - rootColumn]) { + // this is guaranteed to be positive because types only have children + // ids greater than their own id. + columnNames[columnId - rootColumn] = originalColumnNames[i++]; + } + } + return columnNames; + } + static void setSearchArgument(Reader.Options options, List types, Configuration conf, @@ -296,19 +314,8 @@ public class OrcInputFormat implements } LOG.info("ORC pushdown predicate: " + sarg); - int rootColumn = getRootColumn(isOriginal); - String[] neededColumnNames = columnNamesString.split(","); - String[] columnNames = new String[types.size() - rootColumn]; - boolean[] includedColumns = options.getInclude(); - int i = 0; - for(int columnId: types.get(rootColumn).getSubtypesList()) { - if (includedColumns == null || includedColumns[columnId - rootColumn]) { - // this is guaranteed to be positive because types only have children - // ids greater than their own id. - columnNames[columnId - rootColumn] = neededColumnNames[i++]; - } - } - options.searchArgument(sarg, columnNames); + options.searchArgument(sarg, getSargColumnNames( + columnNamesString.split(","), types, options.getInclude(), isOriginal)); } @Override Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Mon Feb 2 22:28:15 2015 @@ -18,8 +18,11 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; +import java.util.List; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; @@ -82,6 +85,9 @@ public interface RecordReader { */ void seekToRow(long rowCount) throws IOException; + void prepareEncodedColumnRead() throws IOException; + + // TODO: maybe all of this should be moved to LLAP-specific class /** * TODO: this API is subject to change; on one hand, external code should control the threading * aspects, with ORC method returning one EncodedColumn as it will; on the other, it's @@ -89,13 +95,18 @@ public interface RecordReader { * return many EncodedColumn-s. * TODO: assumes the reader is for one stripe, otherwise the signature makes no sense. * Also has no columns passed, because that is in ctor. - * @param colRgs Bitmasks of what RGs are to be read. Has # of elements equal to the number of - * included columns; then each bitmask is rgCount bits long; 0 means "need to read" - * @param rgCount The length of bitmasks in colRgs. - * @param sarg Sarg to apply additional filtering to RGs. + * @param colRgs What RGs are to be read. Has # of elements equal to the number of + * included columns; then each boolean is rgCount long. + * @param cache Cache to get/put data and allocate memory. * @param consumer Consumer to pass the results too. - * @param allocator Allocator to allocate memory. + * @throws IOException */ - void readEncodedColumns(long[][] colRgs, int rgCount, - Consumer> consumer, LowLevelCache cache); + void readEncodedColumns(int stripeIx, boolean[][] colRgs, + LowLevelCache cache, Consumer> consumer) throws IOException; + + RowIndex[] getCurrentRowIndexEntries() throws IOException; + + List getCurrentColumnEncodings() throws IOException; + + void setRowIndex(RowIndex[] rowIndex); }