Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EC3D8105FB for ; Tue, 27 Jan 2015 19:06:23 +0000 (UTC) Received: (qmail 49547 invoked by uid 500); 27 Jan 2015 19:06:24 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 49487 invoked by uid 500); 27 Jan 2015 19:06:24 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 49289 invoked by uid 99); 27 Jan 2015 19:06:24 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jan 2015 19:06:24 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 13A46AC0163; Tue, 27 Jan 2015 19:06:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1655107 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/common/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/test/org/apache/hadoop/... Date: Tue, 27 Jan 2015 19:06:23 -0000 To: commits@hive.apache.org From: sershe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150127190624.13A46AC0163@hades.apache.org> Author: sershe Date: Tue Jan 27 19:06:23 2015 New Revision: 1655107 URL: http://svn.apache.org/r1655107 Log: HIVE-9418p1 : ORC using low-level cache Added: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Added: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1655107&view=auto ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (added) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Tue Jan 27 19:06:23 2015 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import java.nio.ByteBuffer; + +/** + * The sections of a file. + */ +public class DiskRange { + /** The first address. */ + public long offset; + /** The address afterwards. */ + public long end; + + public DiskRange(long offset, long end) { + this.offset = offset; + this.end = end; + if (end < offset) { + throw new IllegalArgumentException("invalid range " + this); + } + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != getClass()) { + return false; + } + DiskRange otherR = (DiskRange) other; + return otherR.offset == offset && otherR.end == end; + } + + @Override + public String toString() { + return "range start: " + offset + " end: " + end; + } + + public int getLength() { + long len = this.end - this.offset; + assert len <= Integer.MAX_VALUE; + return (int)len; + } + + // For subclasses + public boolean hasData() { + return false; + } + + public DiskRange slice(long offset, long end) { + // Rather, unexpected usage exception. + throw new UnsupportedOperationException(); + } + + public ByteBuffer getData() { + throw new UnsupportedOperationException(); + } +} \ No newline at end of file Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Tue Jan 27 19:06:23 2015 @@ -21,17 +21,12 @@ package org.apache.hadoop.hive.llap.io.a import java.nio.ByteBuffer; public abstract class LlapMemoryBuffer { - protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) { - initialize(byteBuffer, offset, length); - } protected LlapMemoryBuffer() { } protected void initialize(ByteBuffer byteBuffer, int offset, int length) { - this.byteBuffer = byteBuffer; - this.offset = offset; - this.length = length; + this.byteBuffer = byteBuffer.slice(); + this.byteBuffer.position(offset); + this.byteBuffer.limit(offset + length); } public ByteBuffer byteBuffer; - public int offset; - public int length; } \ No newline at end of file Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original) +++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Tue Jan 27 19:06:23 2015 @@ -18,14 +18,17 @@ package org.apache.hadoop.hive.llap.io.api.cache; +import java.util.LinkedList; + +import org.apache.hadoop.hive.common.DiskRange; -public interface LowLevelCache { +public interface LowLevelCache { /** * Gets file data for particular offsets. Null entries mean no data. * @param file File name; MUST be interned. */ - LlapMemoryBuffer[] getFileData(String fileName, long[] offsets); + void getFileData(String fileName, LinkedList ranges); /** * Puts file data into cache. @@ -33,7 +36,7 @@ public interface LowLevelCache { * @return null if all data was put; bitmask indicating which chunks were not put otherwise; * the replacement chunks from cache are updated directly in the array. */ - long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks); + long[] putFileData(String file, DiskRange[] ranges, LlapMemoryBuffer[] chunks); /** * Releases the buffer returned by getFileData or allocateMultiple. Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Tue Jan 27 19:06:23 2015 @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.io.ap public final class BuddyAllocator implements Allocator { private static final Log LOG = LogFactory.getLog(BuddyAllocator.class); + private final Arena[] arenas; private AtomicInteger allocatedArenas = new AtomicInteger(0); @@ -128,12 +129,6 @@ public final class BuddyAllocator implem return false; } - public static LlapCacheableBuffer allocateFake() { - LlapCacheableBuffer fake = new LlapCacheableBuffer(); - fake.initialize(-1, null, -1, 1); - return fake; - } - @Override public void deallocate(LlapMemoryBuffer buffer) { LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer; @@ -338,8 +333,8 @@ public final class BuddyAllocator implem public void deallocate(LlapCacheableBuffer buffer) { assert data != null; - int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.length) - minAllocLog2, - headerIx = buffer.offset >>> minAllocLog2; + int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.byteBuffer.remaining()) + - minAllocLog2, headerIx = buffer.byteBuffer.position() >>> minAllocLog2; while (true) { FreeList freeList = freeLists[freeListIx]; int bHeaderIx = headerIx ^ (1 << freeListIx); Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Tue Jan 27 19:06:23 2015 @@ -43,39 +43,39 @@ public final class LlapCacheableBuffer e private final AtomicInteger refCount = new AtomicInteger(0); + // All kinds of random stuff needed by various parts of the system, beyond the publicly + // visible bytes "interface". This is not so pretty since all concerns are mixed here. + // But at least we don't waste bunch of memory per every buffer and bunch of virtual calls. + /** Allocator uses this to remember which arena to alloc from. + * TODO Could wrap ByteBuffer instead? This needs reference anyway. */ public int arenaIndex = -1; + /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but + * the lookup is on compressed ranges, so we need to know this. */ + public int declaredLength; + + /** Priority for cache policy (should be pretty universal). */ public double priority; + /** Last priority update time for cache policy (should be pretty universal). */ public long lastUpdate = -1; + /** Linked list pointers for LRFU/LRU cache policies. Given that each block is in cache + * that might be better than external linked list. Or not, since this is not concurrent. */ public LlapCacheableBuffer prev = null, next = null; + /** Index in heap for LRFU/LFU cache policies. */ public int indexInHeap = NOT_IN_CACHE; + // TODO: Add 4 more bytes of crap here! + @VisibleForTesting int getRefCount() { return refCount.get(); } - @Override - public int hashCode() { - if (this.byteBuffer == null) return 0; - return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof LlapCacheableBuffer)) return false; - LlapCacheableBuffer other = (LlapCacheableBuffer)obj; - // We only compare objects, and not contents of the ByteBuffer. - return byteBuffer == other.byteBuffer - && this.offset == other.offset && this.length == other.length; - } - int incRef() { int newRefCount = -1; while (true) { int oldRefCount = refCount.get(); if (oldRefCount == EVICTED_REFCOUNT) return -1; - assert oldRefCount >= 0; + assert oldRefCount >= 0 : "oldRefCount is " + oldRefCount + " " + this; newRefCount = oldRefCount + 1; if (refCount.compareAndSet(oldRefCount, newRefCount)) break; } @@ -95,14 +95,14 @@ public final class LlapCacheableBuffer e int decRef() { int newRefCount = refCount.decrementAndGet(); if (newRefCount < 0) { - throw new AssertionError("Unexpected refCount " + newRefCount); + throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this); } return newRefCount; } @Override public String toString() { - return "0x" + Integer.toHexString(hashCode()); + return "0x" + Integer.toHexString(System.identityHashCode(this)); } /** Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Tue Jan 27 19:06:23 2015 @@ -17,22 +17,32 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.nio.ByteBuffer; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; import com.google.common.annotations.VisibleForTesting; public class LowLevelCacheImpl implements LowLevelCache, EvictionListener { + private static final Log LOG = LogFactory.getLog(LowLevelCacheImpl.class); + private final Allocator allocator; private AtomicInteger newEvictions = new AtomicInteger(0); @@ -66,30 +76,81 @@ public class LowLevelCacheImpl implement } @Override - public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) { - LlapMemoryBuffer[] result = null; + public void getFileData(String fileName, LinkedList ranges) { FileCache subCache = cache.get(fileName); - if (subCache == null || !subCache.incRef()) return result; + if (subCache == null || !subCache.incRef()) return; try { - for (int i = 0; i < offsets.length; ++i) { - while (true) { // Overwhelmingly only runs once. - long offset = offsets[i]; - LlapCacheableBuffer buffer = subCache.cache.get(offset); - if (buffer == null) break; - if (lockBuffer(buffer)) { - if (result == null) { - result = new LlapCacheableBuffer[offsets.length]; - } - result[i] = buffer; - break; - } - if (subCache.cache.remove(offset, buffer)) break; - } + ListIterator dr = ranges.listIterator(); + while (dr.hasNext()) { + getOverlappingRanges(dr, subCache.cache); } } finally { subCache.decRef(); } - return result; + } + + private void getOverlappingRanges(ListIterator drIter, + ConcurrentSkipListMap cache) { + DiskRange currentNotCached = drIter.next(); + Iterator> matches = + cache.subMap(currentNotCached.offset, currentNotCached.end).entrySet().iterator(); + long cacheEnd = -1; + while (matches.hasNext()) { + assert currentNotCached != null; + Map.Entry e = matches.next(); + LlapCacheableBuffer buffer = e.getValue(); + // Lock the buffer, validate it and add to results. + if (!lockBuffer(buffer)) { + // If we cannot lock, remove this from cache and continue. + matches.remove(); + continue; + } + LOG.info("TODO# get +1 " + buffer.toString()); + long cacheOffset = e.getKey(); + if (cacheEnd > cacheOffset) { // compare with old cacheEnd + throw new AssertionError("Cache has overlapping buffers: " + cacheEnd + ") and [" + + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + ")"); + } + cacheEnd = cacheOffset + buffer.declaredLength; + CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd); + currentNotCached = addCachedBufferToIter(drIter, currentNotCached, currentCached); + } + } + + private DiskRange addCachedBufferToIter(ListIterator drIter, + DiskRange currentNotCached, CacheChunk currentCached) { + if (currentNotCached.offset == currentCached.offset) { + if (currentNotCached.end <= currentCached.end) { // we assume it's always "==" now + // Replace the entire current DiskRange with new cached range. Java LL is idiotic, so... + drIter.remove(); + drIter.add(currentCached); + currentNotCached = null; + } else { + // Insert the new cache range before the disk range. + currentNotCached.offset = currentCached.end; + drIter.previous(); + drIter.add(currentCached); + DiskRange dr = drIter.next(); + assert dr == currentNotCached; + } + } else { + assert currentNotCached.offset < currentCached.offset; + long originalEnd = currentNotCached.end; + currentNotCached.end = currentCached.offset; + drIter.add(currentCached); + if (originalEnd <= currentCached.end) { // we assume it's always "==" now + // We have reached the end of the range and truncated the last non-cached range. + currentNotCached = null; + } else { + // Insert the new disk range after the cache range. TODO: not strictly necessary yet? + currentNotCached = new DiskRange(currentCached.end, originalEnd); + drIter.add(currentNotCached); + DiskRange dr = drIter.previous(); + assert dr == currentNotCached; + drIter.next(); + } + } + return currentNotCached; } private boolean lockBuffer(LlapCacheableBuffer buffer) { @@ -97,18 +158,19 @@ public class LowLevelCacheImpl implement if (rc == 1) { cachePolicy.notifyLock(buffer); } - return rc >= 0; + return rc > 0; } @Override - public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) { + public long[] putFileData(String fileName, DiskRange[] ranges, LlapMemoryBuffer[] buffers) { long[] result = null; - assert buffers.length == offsets.length; + assert buffers.length == ranges.length; FileCache subCache = getOrAddFileSubCache(fileName); try { - for (int i = 0; i < offsets.length; ++i) { + for (int i = 0; i < ranges.length; ++i) { LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i]; - long offset = offsets[i]; + long offset = ranges[i].offset; + buffer.declaredLength = ranges[i].getLength(); assert buffer.isLocked(); while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value). LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer); @@ -122,8 +184,14 @@ public class LowLevelCacheImpl implement + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer); } if (lockBuffer(oldVal)) { + // We don't do proper overlap checking because it would cost cycles and we + // think it will never happen. We do perform the most basic check here. + if (oldVal.declaredLength != buffer.declaredLength) { + throw new RuntimeException("Found a block with different length at the same offset: " + + oldVal.declaredLength + " vs " + buffer.declaredLength + " @" + offset); + } // We found an old, valid block for this key in the cache. - releaseBufferInternal(buffer); +s releaseBufferInternal(buffer); buffers[i] = oldVal; if (result == null) { result = new long[align64(buffers.length) >>> 6]; @@ -196,9 +264,10 @@ public class LowLevelCacheImpl implement } } + private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]); public static LlapCacheableBuffer allocateFake() { LlapCacheableBuffer fake = new LlapCacheableBuffer(); - fake.initialize(-1, null, -1, 1); + fake.initialize(-1, fakeBuf, 0, 1); return fake; } @@ -210,9 +279,12 @@ public class LowLevelCacheImpl implement private static class FileCache { private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2; - // TODO: given the specific data, perhaps the nested thing should not be CHM - private ConcurrentHashMap cache - = new ConcurrentHashMap(); + // TODO: given the specific data and lookups, perhaps the nested thing should not be a map + // In fact, CSLM has slow single-threaded operation, and one file is probably often read + // by just one (or few) threads, so a much more simple DS with locking might be better. + // Let's use CSLM for now, since it's available. + private ConcurrentSkipListMap cache + = new ConcurrentSkipListMap(); private AtomicInteger refCount = new AtomicInteger(0); boolean incRef() { Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Tue Jan 27 19:06:23 2015 @@ -69,7 +69,7 @@ public class LowLevelFifoCachePolicy ext LlapCacheableBuffer candidate = iter.next(); if (candidate.invalidate()) { iter.remove(); - evicted += candidate.length; + evicted += candidate.byteBuffer.remaining(); listener.notifyEvicted(candidate); } } Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original) +++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Tue Jan 27 19:06:23 2015 @@ -167,7 +167,7 @@ public class LowLevelLrfuCachePolicy ext // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us. // TODO#: double check this is valid! nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE; - evicted += nextCandidate.length; + evicted += nextCandidate.byteBuffer.remaining(); } if (firstCandidate != nextCandidate) { if (nextCandidate == null) { @@ -194,7 +194,7 @@ public class LowLevelLrfuCachePolicy ext buffer = evictFromHeapUnderLock(time); } if (buffer == null) return evicted; - evicted += buffer.length; + evicted += buffer.byteBuffer.remaining(); listener.notifyEvicted(buffer); } return evicted; Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Tue Jan 27 19:06:23 2015 @@ -179,10 +179,10 @@ public class TestBuddyAllocator { for (int j = 0; j < allocCount; ++j) { LlapMemoryBuffer mem = allocs[index][j]; long testValue = testValues[index][j] = rdm.nextLong(); - mem.byteBuffer.putLong(mem.offset, testValue); - int halfLength = mem.length >> 1; - if (halfLength + 8 <= mem.length) { - mem.byteBuffer.putLong(mem.offset + halfLength, testValue); + mem.byteBuffer.putLong(0, testValue); + int halfLength = mem.byteBuffer.remaining() >> 1; + if (halfLength + 8 <= mem.byteBuffer.remaining()) { + mem.byteBuffer.putLong(halfLength, testValue); } } } @@ -204,10 +204,10 @@ public class TestBuddyAllocator { BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) { for (int j = 0; j < allocs.length; ++j) { LlapCacheableBuffer mem = (LlapCacheableBuffer)allocs[j]; - assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset)); - int halfLength = mem.length >> 1; - if (halfLength + 8 <= mem.length) { - assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset + halfLength)); + assertEquals(testValues[j], mem.byteBuffer.getLong(0)); + int halfLength = mem.byteBuffer.remaining() >> 1; + if (halfLength + 8 <= mem.byteBuffer.remaining()) { + assertEquals(testValues[j], mem.byteBuffer.getLong(halfLength)); } a.deallocate(mem); } Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original) +++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Tue Jan 27 19:06:23 2015 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.cache; +import java.util.Iterator; +import java.util.LinkedList; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -25,16 +27,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.atomic.AtomicInteger; -import javax.management.RuntimeErrorException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; -import org.junit.Assume; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; import org.junit.Test; + import static org.junit.Assert.*; public class TestLowLevelCacheImpl { @@ -83,22 +84,76 @@ public class TestLowLevelCacheImpl { String fn1 = "file1".intern(), fn2 = "file2".intern(); LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() }; verifyRefcount(fakes, 1, 1, 1, 1, 1, 1); - assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1))); - assertNull(cache.putFileData(fn2, new long[] { 1, 2 }, fbs(fakes, 2, 3))); - assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 })); - assertArrayEquals(fbs(fakes, 2, 3), cache.getFileData(fn2, new long[] { 1, 2 })); - assertArrayEquals(fbs(fakes, 1, -1), cache.getFileData(fn1, new long[] { 2, 3 })); + assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1))); + assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3))); + verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]); + verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]); + verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4)); verifyRefcount(fakes, 2, 3, 2, 2, 1, 1); LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5); - long[] mask = cache.putFileData(fn1, new long[] { 3, 1 }, bufsDiff); + long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff); assertEquals(1, mask.length); assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache. assertSame(fakes[0], bufsDiff[1]); // Should have been replaced verifyRefcount(fakes, 3, 3, 2, 2, 1, 0); - assertArrayEquals(fbs(fakes, 0, 1, 4), cache.getFileData(fn1, new long[] { 1, 2, 3 })); + verifyCacheGet(cache, fn1, 1, 4, fakes[0], fakes[1], fakes[4]); verifyRefcount(fakes, 4, 4, 2, 2, 2, 0); } + private void verifyCacheGet(LowLevelCacheImpl cache, String fileName, Object... stuff) { + LinkedList input = new LinkedList(); + Iterator iter = null; + int intCount = 0, lastInt = -1; + int resultCount = stuff.length; + for (Object obj : stuff) { + if (obj instanceof Integer) { + --resultCount; + assertTrue(intCount >= 0); + if (intCount == 0) { + lastInt = (Integer)obj; + intCount = 1; + } else { + input.add(new DiskRange(lastInt, (Integer)obj)); + intCount = 0; + } + continue; + } else if (intCount >= 0) { + assertTrue(intCount == 0); + assertFalse(input.isEmpty()); + intCount = -1; + cache.getFileData(fileName, input); + assertEquals(resultCount, input.size()); + iter = input.iterator(); + } + assertTrue(iter.hasNext()); + DiskRange next = iter.next(); + if (obj instanceof LlapMemoryBuffer) { + assertTrue(next instanceof CacheChunk); + assertSame(obj, ((CacheChunk)next).buffer); + } else { + assertTrue(next.equals(obj)); + } + } + } + + @Test + public void testMultiMatch() { + Configuration conf = createConf(); + LowLevelCacheImpl cache = new LowLevelCacheImpl( + conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread + String fn = "file1".intern(); + LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() }; + assertNull(cache.putFileData(fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes)); + verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9)); + verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]); + verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5)); + verifyCacheGet(cache, fn, 1, 3, dr(1, 2), fakes[0]); + verifyCacheGet(cache, fn, 3, 4, dr(3, 4)); // We don't expect cache requests from the middle. + verifyCacheGet(cache, fn, 3, 7, dr(3, 6), fakes[1]); + verifyCacheGet(cache, fn, 0, 2, 4, 6, dr(0, 2), dr(4, 6)); + verifyCacheGet(cache, fn, 2, 4, 6, 8, fakes[0], fakes[1]); + } + @Test public void testStaleValueGet() { Configuration conf = createConf(); @@ -106,15 +161,15 @@ public class TestLowLevelCacheImpl { conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread String fn1 = "file1".intern(), fn2 = "file2".intern(); LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() }; - assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1))); - assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 2))); - assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 })); - assertArrayEquals(fbs(fakes, 2), cache.getFileData(fn2, new long[] { 1 })); + assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1))); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2))); + verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]); + verifyCacheGet(cache, fn2, 1, 2, fakes[2]); verifyRefcount(fakes, 2, 2, 2); evict(cache, fakes[0]); evict(cache, fakes[2]); - assertArrayEquals(fbs(fakes, -1, 1), cache.getFileData(fn1, new long[] { 1, 2 })); - assertNull(cache.getFileData(fn2, new long[] { 1 })); + verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]); + verifyCacheGet(cache, fn2, 1, 2, dr(1, 2)); verifyRefcount(fakes, -1, 3, -1); } @@ -126,15 +181,15 @@ public class TestLowLevelCacheImpl { String fn1 = "file1".intern(), fn2 = "file2".intern(); LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() }; - assertNull(cache.putFileData(fn1, new long[] { 1, 2, 3 }, fbs(fakes, 0, 1, 2))); - assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 3))); + assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2))); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3))); evict(cache, fakes[0]); evict(cache, fakes[3]); - long[] mask = cache.putFileData(fn1, new long[] { 1, 2, 3, 4 }, fbs(fakes, 4, 5, 6, 7)); + long[] mask = cache.putFileData(fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7)); assertEquals(1, mask.length); assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't - assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 8))); - assertArrayEquals(fbs(fakes, 4, 2, 3, 7), cache.getFileData(fn1, new long[] { 1, 2, 3, 4 })); + assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8))); + verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]); } @Test @@ -157,19 +212,23 @@ public class TestLowLevelCacheImpl { String fileName = isFn1 ? fn1 : fn2; int fileIndex = isFn1 ? 1 : 2; int count = rdm.nextInt(offsetsToUse); - long[] offsets = new long[count]; - for (int j = 0; j < offsets.length; ++j) { - offsets[j] = rdm.nextInt(offsetsToUse); + LinkedList input = new LinkedList(); + int[] offsets = new int[count]; + for (int j = 0; j < count; ++j) { + int next = rdm.nextInt(offsetsToUse); + input.add(dr(next, next + 1)); + offsets[j] = next; } if (isGet) { - LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets); - if (results == null) continue; - for (int j = 0; j < offsets.length; ++j) { - if (results[j] == null) continue; + cache.getFileData(fileName, input); + int j = -1; + for (DiskRange dr : input) { + ++j; + if (!(dr instanceof CacheChunk)) continue; ++gets; - LlapCacheableBuffer result = (LlapCacheableBuffer)(results[j]); + LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)dr).buffer; assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex); - result.decRef(); + cache.releaseBuffer(result); } } else { LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count]; @@ -179,7 +238,8 @@ public class TestLowLevelCacheImpl { buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]); buffers[j] = buf; } - long[] mask = cache.putFileData(fileName, offsets, buffers); + long[] mask = cache.putFileData( + fileName, input.toArray(new DiskRange[count]), buffers); puts += buffers.length; long maskVal = 0; if (mask != null) { @@ -192,7 +252,7 @@ public class TestLowLevelCacheImpl { assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex); } maskVal >>= 1; - buf.decRef(); + cache.releaseBuffer(buf); } } } @@ -210,24 +270,25 @@ public class TestLowLevelCacheImpl { FutureTask evictionTask = new FutureTask(new Callable() { public Integer call() { boolean isFirstFile = false; - long[] offsets = new long[offsetsToUse]; Random rdm = new Random(1234 + Thread.currentThread().getId()); - for (int i = 0; i < offsetsToUse; ++i) { - offsets[i] = i; - } + LinkedList input = new LinkedList(); + DiskRange allOffsets = new DiskRange(0, offsetsToUse + 1); int evictions = 0; syncThreadStart(cdlIn, cdlOut); while (rdmsDone.get() < 3) { + input.clear(); + input.add(allOffsets); isFirstFile = !isFirstFile; String fileName = isFirstFile ? fn1 : fn2; - LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets); - if (results == null) continue; - int startIndex = rdm.nextInt(results.length), index = startIndex; + cache.getFileData(fileName, input); + DiskRange[] results = input.toArray(new DiskRange[input.size()]); + int startIndex = rdm.nextInt(input.size()), index = startIndex; LlapCacheableBuffer victim = null; do { - if (results[index] != null) { - LlapCacheableBuffer result = (LlapCacheableBuffer)results[index]; - result.decRef(); + DiskRange r = results[index]; + if (r instanceof CacheChunk) { + LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)r).buffer; + cache.releaseBuffer(result); if (victim == null && result.invalidate()) { ++evictions; victim = result; @@ -305,6 +366,18 @@ public class TestLowLevelCacheImpl { return fake; } + private DiskRange dr(int from, int to) { + return new DiskRange(from, to); + } + + private DiskRange[] drs(int... offsets) { + DiskRange[] result = new DiskRange[offsets.length]; + for (int i = 0; i < offsets.length; ++i) { + result[i] = new DiskRange(offsets[i], offsets[i] + 1); + } + return result; + } + private Configuration createConf() { Configuration conf = new Configuration(); conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, 3); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Jan 27 19:06:23 2015 @@ -20,28 +20,33 @@ package org.apache.hadoop.hive.ql.io.orc import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.DiskRange; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; +import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; -abstract class InStream extends InputStream { +import com.google.common.annotations.VisibleForTesting; +abstract class InStream extends InputStream { private static final Log LOG = LogFactory.getLog(InStream.class); private static class UncompressedStream extends InStream { private final String name; - private final ByteBuffer[] bytes; - private final long[] offsets; + private final List bytes; private final long length; private long currentOffset; private ByteBuffer range; private int currentRange; - public UncompressedStream(String name, ByteBuffer[] input, long[] offsets, - long length) { + public UncompressedStream(String name, List input, long length) { this.name = name; this.bytes = input; - this.offsets = offsets; this.length = length; currentRange = 0; currentOffset = 0; @@ -83,12 +88,10 @@ abstract class InStream extends InputStr @Override public void close() { - currentRange = bytes.length; + currentRange = bytes.size(); currentOffset = length; // explicit de-ref of bytes[] - for(int i = 0; i < bytes.length; i++) { - bytes[i] = null; - } + bytes.clear(); } @Override @@ -97,14 +100,15 @@ abstract class InStream extends InputStr } public void seek(long desired) { - for(int i = 0; i < bytes.length; ++i) { - if (offsets[i] <= desired && - desired - offsets[i] < bytes[i].remaining()) { + for(int i = 0; i < bytes.size(); ++i) { + DiskRange curRange = bytes.get(i); + if (curRange.offset <= desired && + (desired - curRange.offset) < curRange.getLength()) { currentOffset = desired; currentRange = i; - this.range = bytes[i].duplicate(); + this.range = curRange.getData(); int pos = range.position(); - pos += (int)(desired - offsets[i]); // this is why we duplicate + pos += (int)(desired - curRange.offset); // this is why we duplicate this.range.position(pos); return; } @@ -122,50 +126,66 @@ abstract class InStream extends InputStr } private static class CompressedStream extends InStream { + private final String fileName; private final String name; - private final ByteBuffer[] bytes; - private final long[] offsets; + private final List bytes; private final int bufferSize; private final long length; + private LlapMemoryBuffer cacheBuffer; private ByteBuffer uncompressed; private final CompressionCodec codec; private ByteBuffer compressed; private long currentOffset; private int currentRange; private boolean isUncompressedOriginal; - private boolean isDirect = false; + private final LowLevelCache cache; + private final boolean doManageBuffers = true; - public CompressedStream(String name, ByteBuffer[] input, - long[] offsets, long length, - CompressionCodec codec, int bufferSize - ) { + public CompressedStream(String fileName, String name, List input, long length, + CompressionCodec codec, int bufferSize, LowLevelCache cache) { + this.fileName = fileName; this.bytes = input; this.name = name; this.codec = codec; this.length = length; - if(this.length > 0) { - isDirect = this.bytes[0].isDirect(); - } - this.offsets = offsets; this.bufferSize = bufferSize; currentOffset = 0; currentRange = 0; + this.cache = cache; } - // TODO: this should allocate from cache - private ByteBuffer allocateBuffer(int size) { + private ByteBuffer allocateBuffer(int size, boolean isDirect) { // TODO: use the same pool as the ORC readers - if(isDirect == true) { + if (isDirect) { return ByteBuffer.allocateDirect(size); } else { return ByteBuffer.allocate(size); } } + // TODO: This should not be used for main path. + private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1]; + private void allocateForUncompressed(int size, boolean isDirect) { + if (cache == null) { + cacheBuffer = null; + uncompressed = allocateBuffer(size, isDirect); + } else { + singleAllocDest[0] = null; + cache.allocateMultiple(singleAllocDest, size); + cacheBuffer = singleAllocDest[0]; + uncompressed = cacheBuffer.byteBuffer; + } + } + private void readHeader() throws IOException { if (compressed == null || compressed.remaining() <= 0) { seek(currentOffset); } + if (cacheBuffer != null) { + assert compressed == null; + return; // Next block is ready from cache. + } + long originalOffset = currentOffset; if (compressed.remaining() > OutStream.HEADER_SIZE) { int b0 = compressed.get() & 0xff; int b1 = compressed.get() & 0xff; @@ -188,14 +208,19 @@ abstract class InStream extends InputStr isUncompressedOriginal = true; } else { if (isUncompressedOriginal) { - uncompressed = allocateBuffer(bufferSize); + allocateForUncompressed(bufferSize, slice.isDirect()); isUncompressedOriginal = false; } else if (uncompressed == null) { - uncompressed = allocateBuffer(bufferSize); + allocateForUncompressed(bufferSize, slice.isDirect()); } else { uncompressed.clear(); } codec.decompress(slice, uncompressed); + if (cache != null) { + // TODO: this is the inefficient path + cache.putFileData(fileName, new DiskRange[] { new DiskRange(originalOffset, + chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer }); + } } } else { throw new IllegalStateException("Can't read header at " + this); @@ -239,13 +264,20 @@ abstract class InStream extends InputStr @Override public void close() { + cacheBuffer = null; uncompressed = null; compressed = null; - currentRange = bytes.length; + currentRange = bytes.size(); currentOffset = length; - for(int i = 0; i < bytes.length; i++) { - bytes[i] = null; + if (doManageBuffers) { + // TODO: this is the inefficient path for now. LLAP will used this differently. + for (DiskRange range : bytes) { + if (range instanceof CacheChunk) { + cache.releaseBuffer(((CacheChunk)range).buffer); + } + } } + bytes.clear(); } @Override @@ -262,8 +294,8 @@ abstract class InStream extends InputStr } } - /* slices a read only contigous buffer of chunkLength */ - private ByteBuffer slice(int chunkLength) throws IOException { + /* slices a read only contiguous buffer of chunkLength */ + private ByteBuffer slice(int chunkLength) throws IOException { int len = chunkLength; final long oldOffset = currentOffset; ByteBuffer slice; @@ -274,7 +306,7 @@ abstract class InStream extends InputStr currentOffset += len; compressed.position(compressed.position() + len); return slice; - } else if (currentRange >= (bytes.length - 1)) { + } else if (currentRange >= (bytes.size() - 1)) { // nothing has been modified yet throw new IOException("EOF in " + this + " while trying to read " + chunkLength + " bytes"); @@ -288,16 +320,20 @@ abstract class InStream extends InputStr // we need to consolidate 2 or more buffers into 1 // first copy out compressed buffers - ByteBuffer copy = allocateBuffer(chunkLength); + ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); currentOffset += compressed.remaining(); len -= compressed.remaining(); copy.put(compressed); - while (len > 0 && (++currentRange) < bytes.length) { + while (len > 0 && (++currentRange) < bytes.size()) { if (LOG.isDebugEnabled()) { LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); } - compressed = bytes[currentRange].duplicate(); + DiskRange range = bytes.get(currentRange); + if (!(range instanceof BufferChunk)) { + throw new IOException("Trying to extend compressed block into uncompressed block"); + } + compressed = range.getData().duplicate(); if (compressed.remaining() >= len) { slice = compressed.slice(); slice.limit(len); @@ -318,40 +354,61 @@ abstract class InStream extends InputStr } private void seek(long desired) throws IOException { - for(int i = 0; i < bytes.length; ++i) { - if (offsets[i] <= desired && - desired - offsets[i] < bytes[i].remaining()) { + for(int i = 0; i < bytes.size(); ++i) { + DiskRange range = bytes.get(i); + if (range.offset <= desired && desired < range.end) { currentRange = i; - compressed = bytes[i].duplicate(); - int pos = compressed.position(); - pos += (int)(desired - offsets[i]); - compressed.position(pos); + if (range instanceof BufferChunk) { + cacheBuffer = null; + compressed = range.getData().duplicate(); + int pos = compressed.position(); + pos += (int)(desired - range.offset); + compressed.position(pos); + } else { + compressed = null; + cacheBuffer = ((CacheChunk)range).buffer; + uncompressed = cacheBuffer.byteBuffer.duplicate(); + if (desired != range.offset) { + throw new IOException("Cannot seek into the middle of uncompressed cached data"); + } + } currentOffset = desired; return; } } // if they are seeking to the precise end, go ahead and let them go there - int segments = bytes.length; - if (segments != 0 && - desired == offsets[segments - 1] + bytes[segments - 1].remaining()) { + int segments = bytes.size(); + if (segments != 0 && desired == bytes.get(segments - 1).end) { + DiskRange range = bytes.get(segments - 1); currentRange = segments - 1; - compressed = bytes[currentRange].duplicate(); - compressed.position(compressed.limit()); - currentOffset = desired; + if (range instanceof BufferChunk) { + cacheBuffer = null; + compressed = range.getData().duplicate(); + compressed.position(compressed.limit()); + } else { + compressed = null; + cacheBuffer = ((CacheChunk)range).buffer; + uncompressed = cacheBuffer.byteBuffer.duplicate(); + uncompressed.position(uncompressed.limit()); + if (desired != range.offset) { + throw new IOException("Cannot seek into the middle of uncompressed cached data"); + } + currentOffset = desired; + } return; } - throw new IOException("Seek outside of data in " + this + " to " + - desired); + throw new IOException("Seek outside of data in " + this + " to " + desired); } private String rangeString() { StringBuilder builder = new StringBuilder(); - for(int i=0; i < offsets.length; ++i) { + for(int i=0; i < bytes.size(); ++i) { if (i != 0) { builder.append("; "); } - builder.append(" range " + i + " = " + offsets[i] + " to " + - bytes[i].remaining()); + DiskRange range = bytes.get(i); + builder.append(" range " + i + " = " + range.offset + + " to " + (range.end - range.offset)); } return builder.toString(); } @@ -382,17 +439,43 @@ abstract class InStream extends InputStr * @return an input stream * @throws IOException */ + @VisibleForTesting + @Deprecated public static InStream create(String name, - ByteBuffer[] input, + ByteBuffer[] buffers, long[] offsets, long length, CompressionCodec codec, int bufferSize) throws IOException { + List input = new ArrayList(buffers.length); + for (int i = 0; i < buffers.length; ++i) { + input.add(new BufferChunk(buffers[i], offsets[i])); + } + return create(null, name, input, length, codec, bufferSize, null); + } + + /** + * Create an input stream from a list of disk ranges with data. + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @param cache Low-level cache to use to put data, if any. Only works with compressed streams. + * @return an input stream + * @throws IOException + */ + public static InStream create(String fileName, + String name, + List input, + long length, + CompressionCodec codec, + int bufferSize, + LowLevelCache cache) throws IOException { if (codec == null) { - return new UncompressedStream(name, input, offsets, length); + return new UncompressedStream(name, input, length); } else { - return new CompressedStream(name, input, offsets, length, codec, - bufferSize); + return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache); } } } Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Jan 27 19:06:23 2015 @@ -33,12 +33,14 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -463,14 +465,14 @@ public class ReaderImpl implements Reade int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; footerBuffer.limit(position + metadataSize); - InputStream instream = InStream.create("metadata", new ByteBuffer[]{footerBuffer}, - new long[]{0L}, metadataSize, codec, bufferSize); + InputStream instream = InStream.create(null, "metadata", Lists.newArrayList( + new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize, null); this.metadata = OrcProto.Metadata.parseFrom(instream); footerBuffer.position(position + metadataSize); footerBuffer.limit(position + metadataSize + footerBufferSize); - instream = InStream.create("footer", new ByteBuffer[]{footerBuffer}, - new long[]{0L}, footerBufferSize, codec, bufferSize); + instream = InStream.create(null, "footer", Lists.newArrayList( + new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize, null); this.footer = OrcProto.Footer.parseFrom(instream); footerBuffer.position(position); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jan 27 19:06:23 2015 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY; import java.io.EOFException; +import java.io.FileDescriptor; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -29,7 +30,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.TreeMap; @@ -41,10 +44,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.io.api.EncodedColumn; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; @@ -79,12 +84,14 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; public class RecordReaderImpl implements RecordReader { private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); + private final String fileName; private final FSDataInputStream file; private final long firstRow; protected final List stripes = @@ -102,16 +109,16 @@ public class RecordReaderImpl implements private long rowCountInStripe = 0; private final Map streams = new HashMap(); - List bufferChunks = new ArrayList(0); + List bufferChunks = new ArrayList(0); private final TreeReader reader; private final OrcProto.RowIndex[] indexes; private final SearchArgument sarg; // the leaf predicates for the sarg private final List sargLeaves; - // an array the same length as the sargLeaves that map them to column ids - private final int[] filterColumns; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; + // an array the same length as the sargLeaves that map them to column ids + private final int[] filterColumns; private final Configuration conf; private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool(); @@ -245,6 +252,7 @@ public class RecordReaderImpl implements long strideRate, Configuration conf ) throws IOException { + this.fileName = path.toString().intern(); // should we normalize this, like DFS would? this.file = fileSystem.open(path); this.codec = codec; this.types = types; @@ -2280,9 +2288,9 @@ public class RecordReaderImpl implements ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.seek(offset); file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.create("footer", - new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec, - bufferSize)); + return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer", + Lists.newArrayList(new BufferChunk(tailBuf, 0)), + tailLength, codec, bufferSize, null)); } static enum Location { @@ -2633,8 +2641,10 @@ public class RecordReaderImpl implements } if(bufferChunks != null) { if(zcr != null) { - for (BufferChunk bufChunk : bufferChunks) { - zcr.releaseBuffer(bufChunk.chunk); + for (DiskRange range : bufferChunks) { + if (range instanceof BufferChunk) { + zcr.releaseBuffer(((BufferChunk)range).chunk); + } } } bufferChunks.clear(); @@ -2691,68 +2701,76 @@ public class RecordReaderImpl implements ) throws IOException { long start = stripe.getIndexLength(); long end = start + stripe.getDataLength(); - // TODO: planning should be added here too, to take cache into account // explicitly trigger 1 big read - DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)}; - bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges)); + LinkedList rangesToRead = Lists.newLinkedList(); + rangesToRead.add(new DiskRange(start, end)); + if (this.cache != null) { + cache.getFileData(fileName, rangesToRead); + } + readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead); + bufferChunks = rangesToRead; List streamDescriptions = stripeFooter.getStreamsList(); - createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); + createStreams( + streamDescriptions, bufferChunks, null, codec, bufferSize, streams, cache); // TODO: decompressed data from streams should be put in cache } /** - * The sections of stripe that we need to read. + * The sections of stripe that we have read. + * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries. */ - static class DiskRange { - /** the first address we need to read. */ - long offset; - /** the first address afterwards. */ - long end; + public static class BufferChunk extends DiskRange { + final ByteBuffer chunk; - DiskRange(long offset, long end) { - this.offset = offset; - this.end = end; - if (end < offset) { - throw new IllegalArgumentException("invalid range " + this); - } + BufferChunk(ByteBuffer chunk, long offset) { + super(offset, offset + chunk.remaining()); + this.chunk = chunk; } @Override - public boolean equals(Object other) { - if (other == null || other.getClass() != getClass()) { - return false; - } - DiskRange otherR = (DiskRange) other; - return otherR.offset == offset && otherR.end == end; + public boolean hasData() { + return chunk != null; } @Override - public String toString() { - return "range start: " + offset + " end: " + end; + public final String toString() { + return "range start: " + offset + " size: " + chunk.remaining() + " type: " + + (chunk.isDirect() ? "direct" : "array-backed"); + } + + @Override + public DiskRange slice(long offset, long end) { + assert offset <= end && offset >= this.offset && end <= this.end; + ByteBuffer sliceBuf = chunk.slice(); + int newPos = (int)(offset - this.offset); + int newLen = chunk.limit() - chunk.position() - (int)(this.end - end); + sliceBuf.position(newPos); + sliceBuf.limit(newPos + newLen); + return new BufferChunk(sliceBuf, offset); + } + + @Override + public ByteBuffer getData() { + return chunk; } } - /** - * The sections of stripe that we have read. - * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries. - */ - static class BufferChunk { - final ByteBuffer chunk; - /** the first address we need to read. */ - final long offset; - /** end of the buffer **/ - final long end; + public static class CacheChunk extends DiskRange { + public final LlapMemoryBuffer buffer; - BufferChunk(ByteBuffer chunk, long offset) { - this.offset = offset; - this.chunk = chunk; - end = offset + chunk.remaining(); + public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) { + super(offset, end); + this.buffer = buffer; } @Override - public final String toString() { - return "range start: " + offset + " size: " + chunk.remaining() + " type: " - + (chunk.isDirect() ? "direct" : "array-backed"); + public boolean hasData() { + return buffer != null; + } + + @Override + public ByteBuffer getData() { + return buffer.byteBuffer; } } @@ -2860,7 +2878,7 @@ public class RecordReaderImpl implements * @param compressionSize the compression block size * @return the list of disk ranges that will be loaded */ - static List planReadPartialDataStreams + static LinkedList planReadPartialDataStreams (List streamList, OrcProto.RowIndex[] indexes, boolean[] includedColumns, @@ -2869,7 +2887,7 @@ public class RecordReaderImpl implements List encodings, List types, int compressionSize) { - List result = new ArrayList(); + LinkedList result = new LinkedList(); long offset = 0; // figure out which columns have a present stream boolean[] hasNull = new boolean[types.size()]; @@ -2889,6 +2907,7 @@ public class RecordReaderImpl implements isDictionary(streamKind, encodings.get(column))) { result.add(new DiskRange(offset, offset + length)); } else { + DiskRange lastRange = null; for(int group=0; group < includedRowGroups.length; ++group) { if (includedRowGroups[group]) { int posn = getIndexPosition(encodings.get(column).getKind(), @@ -2901,7 +2920,6 @@ public class RecordReaderImpl implements } else { nextGroupOffset = length; } - // figure out the worst case last location // if adjacent groups have the same compressed block offset then stretch the slop @@ -2911,7 +2929,15 @@ public class RecordReaderImpl implements : WORST_UNCOMPRESSED_SLOP; long end = (group == includedRowGroups.length - 1) ? length : Math.min(length, nextGroupOffset + slop); - result.add(new DiskRange(offset + start, offset + end)); + start += offset; + end += offset; + if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) { + lastRange.offset = Math.min(lastRange.offset, start); + lastRange.end = Math.max(lastRange.end, end); + } else { + lastRange = new DiskRange(start, end); + result.add(lastRange); + } } } } @@ -2951,19 +2977,23 @@ public class RecordReaderImpl implements * ranges * @throws IOException */ - static List readDiskRanges(FSDataInputStream file, + static void readDiskRanges(FSDataInputStream file, ZeroCopyReaderShim zcr, long base, - List ranges) throws IOException { - ArrayList result = new ArrayList(ranges.size()); - for(DiskRange range: ranges) { + LinkedList ranges) throws IOException { + ListIterator rangeIter = ranges.listIterator(); + while (rangeIter.hasNext()) { + DiskRange range = rangeIter.next(); + if (range.hasData()) continue; + rangeIter.remove(); + rangeIter.previous(); // TODO: is this valid on single-element list? int len = (int) (range.end - range.offset); long off = range.offset; file.seek(base + off); if(zcr != null) { while(len > 0) { ByteBuffer partial = zcr.readBuffer(len, false); - result.add(new BufferChunk(partial, off)); + ranges.add(new BufferChunk(partial, off)); int read = partial.remaining(); len -= read; off += read; @@ -2971,10 +3001,9 @@ public class RecordReaderImpl implements } else { byte[] buffer = new byte[len]; file.readFully(buffer, 0, buffer.length); - result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); + ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); } } - return result; } /** @@ -3010,78 +3039,79 @@ public class RecordReaderImpl implements return buffer.toString(); } - static void createStreams(List streamDescriptions, - List ranges, + void createStreams(List streamDescriptions, + List ranges, boolean[] includeColumn, CompressionCodec codec, int bufferSize, - Map streams - ) throws IOException { - long offset = 0; + Map streams, + LowLevelCache cache) throws IOException { + long streamOffset = 0; for(OrcProto.Stream streamDesc: streamDescriptions) { int column = streamDesc.getColumn(); - if ((includeColumn == null || includeColumn[column]) && - StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) { - long length = streamDesc.getLength(); - int first = -1; - int last = -2; - for(int i=0; i < ranges.size(); ++i) { - BufferChunk range = ranges.get(i); - if (overlap(offset, offset+length, range.offset, range.end)) { - if (first == -1) { - first = i; - } - last = i; - } - } - ByteBuffer[] buffers = new ByteBuffer[last - first + 1]; - long[] offsets = new long[last - first + 1]; - for(int i=0; i < buffers.length; ++i) { - BufferChunk range = ranges.get(i + first); - long start = Math.max(range.offset, offset); - long end = Math.min(range.end, offset+length); - buffers[i] = range.chunk.slice(); - assert range.chunk.position() == 0; // otherwise we'll mix up positions - /* - * buffers are positioned in-wards if the offset > range.offset - * offsets[i] == range.offset - offset, except if offset > range.offset - */ - if(offset > range.offset) { - buffers[i].position((int)(offset - range.offset)); - buffers[i].limit((int)(end - range.offset)); - offsets[i] = 0; - } else { - buffers[i].position(0); - buffers[i].limit((int)(end - range.offset)); - offsets[i] = (range.offset - offset); - } + if ((includeColumn != null && !includeColumn[column]) || + StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) { + streamOffset += streamDesc.getLength(); + continue; + } + long streamEnd = streamOffset + streamDesc.getLength(); + // TODO: This assumes sorted ranges (as do many other parts of ORC code. + ArrayList buffers = new ArrayList(); + boolean inRange = false; + for (int i = 0; i < ranges.size(); ++i) { + DiskRange range = ranges.get(i); + boolean isLast = range.end >= streamEnd; + if (!inRange) { + if (range.end >= streamOffset) continue; // Skip until we are in range. + inRange = true; + if (range.offset < streamOffset) { + // Partial first buffer, add a slice of it. + buffers.add(range.slice(Math.max(range.offset, streamOffset), + Math.min(streamEnd, range.end))); + if (isLast) break; // Partial first buffer is also partial last buffer. + continue; + } + } + if (range.end > streamEnd) { + // Partial last buffer (may also be the first buffer), add a slice of it. + buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end))); + break; } - StreamName name = new StreamName(column, streamDesc.getKind()); - streams.put(name, InStream.create(name.toString(), buffers, offsets, - length, codec, bufferSize)); + buffers.add(range); // Normal buffer. } - offset += streamDesc.getLength(); + StreamName name = new StreamName(column, streamDesc.getKind()); + streams.put(name, InStream.create(fileName, name.toString(), buffers, + streamEnd - streamOffset, codec, bufferSize, cache)); + streamOffset += streamDesc.getLength(); } } + private LowLevelCache cache = null; + public void setCache(LowLevelCache cache) { + this.cache = cache; + } + private void readPartialDataStreams(StripeInformation stripe ) throws IOException { List streamList = stripeFooter.getStreamsList(); - // TODO: planning should take cache into account - List chunks = + LinkedList rangesToRead = planReadPartialDataStreams(streamList, indexes, included, includedRowGroups, codec != null, stripeFooter.getColumnsList(), types, bufferSize); if (LOG.isDebugEnabled()) { - LOG.debug("chunks = " + stringifyDiskRanges(chunks)); + LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead)); } - mergeDiskRanges(chunks); + mergeDiskRanges(rangesToRead); + if (this.cache != null) { + cache.getFileData(fileName, rangesToRead); + } + readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead); + bufferChunks = rangesToRead; if (LOG.isDebugEnabled()) { - LOG.debug("merge = " + stringifyDiskRanges(chunks)); + LOG.debug("merge = " + stringifyDiskRanges(rangesToRead)); } - bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks); - // TODO: decompressed data from streams should be put in cache - createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); + + createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache); } @Override @@ -3272,9 +3302,9 @@ public class RecordReaderImpl implements byte[] buffer = new byte[(int) stream.getLength()]; file.seek(offset); file.readFully(buffer); - indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", - new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0}, - stream.getLength(), codec, bufferSize)); + indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index", + Lists.newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)), + stream.getLength(), codec, bufferSize, null)); } } offset += stream.getLength(); Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff ============================================================================== --- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Tue Jan 27 19:06:23 2015 @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; @@ -769,18 +770,18 @@ public class TestRecordReaderImpl { assertTrue(!RecordReaderImpl.overlap(0, 10, 11, 12)); } - private static List diskRanges(Integer... points) { - List result = - new ArrayList(); + private static List diskRanges(Integer... points) { + List result = + new ArrayList(); for(int i=0; i < points.length; i += 2) { - result.add(new RecordReaderImpl.DiskRange(points[i], points[i+1])); + result.add(new DiskRange(points[i], points[i+1])); } return result; } @Test public void testMergeDiskRanges() throws Exception { - List list = diskRanges(); + List list = diskRanges(); RecordReaderImpl.mergeDiskRanges(list); assertThat(list, is(diskRanges())); list = diskRanges(100, 200, 300, 400, 500, 600); @@ -860,7 +861,7 @@ public class TestRecordReaderImpl { @Test public void testPartialPlan() throws Exception { - List result; + List result; // set the streams List streams = new ArrayList(); @@ -968,7 +969,7 @@ public class TestRecordReaderImpl { @Test public void testPartialPlanCompressed() throws Exception { - List result; + List result; // set the streams List streams = new ArrayList(); @@ -1050,7 +1051,7 @@ public class TestRecordReaderImpl { @Test public void testPartialPlanString() throws Exception { - List result; + List result; // set the streams List streams = new ArrayList();