hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1662163 - in /hive/branches/llap: data/conf/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/
Date Wed, 25 Feb 2015 02:48:43 GMT
Author: sershe
Date: Wed Feb 25 02:48:42 2015
New Revision: 1662163

URL: http://svn.apache.org/r1662163
Log:
Potential fix for evict issue; revert if it breaks things

Modified:
    hive/branches/llap/data/conf/hive-site.xml
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java

Modified: hive/branches/llap/data/conf/hive-site.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/data/conf/hive-site.xml?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/data/conf/hive-site.xml (original)
+++ hive/branches/llap/data/conf/hive-site.xml Wed Feb 25 02:48:42 2015
@@ -253,17 +253,17 @@
 
 <property>
   <name>hive.llap.io.cache.orc.size</name>
-  <value>67108864</value>
+  <value>2097152</value>
 </property>
 
 <property>
   <name>hive.llap.io.cache.orc.arena.size</name>
-  <value>16777216</value>
+  <value>2097152</value>
 </property>
 
 <property>
-  <name>hive.llap.io.cache.orc.arena.size</name>
-  <value>16777216</value>
+  <name>hive.llap.io.cache.orc.alloc.max</name>
+  <value>2097152</value>
 </property>
 
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
Wed Feb 25 02:48:42 2015
@@ -234,6 +234,7 @@ public final class BuddyAllocator implem
       FreeList freeList = freeLists[freeListIx];
       int remaining = -1;
       freeList.lock.lock();
+      // TODO: write some comments for this method
       try {
         ix = allocateFromFreeListUnderLock(
             arenaIx, freeList, freeListIx, dest, ix, allocationSize);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
Wed Feb 25 02:48:42 2015
@@ -79,6 +79,9 @@ public final class LlapCacheableBuffer e
       newRefCount = oldRefCount + 1;
       if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
     }
+    if (DebugUtils.isTraceLockingEnabled()) {
+      LlapIoImpl.LOG.info("Locked " + this + "; new ref count " + newRefCount);
+    }
     return newRefCount;
   }
 
@@ -94,6 +97,9 @@ public final class LlapCacheableBuffer e
 
   int decRef() {
     int newRefCount = refCount.decrementAndGet();
+    if (DebugUtils.isTraceLockingEnabled()) {
+      LlapIoImpl.LOG.info("Unlocked " + this + "; refcount " + newRefCount);
+    }
     if (newRefCount < 0) {
       throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
     }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
Wed Feb 25 02:48:42 2015
@@ -109,6 +109,10 @@ public class LowLevelCacheImpl implement
       Map.Entry<Long, LlapCacheableBuffer> e = matches.next();
       LlapCacheableBuffer buffer = e.getValue();
       // Lock the buffer, validate it and add to results.
+      if (DebugUtils.isTraceLockingEnabled()) {
+        LlapIoImpl.LOG.info("Locking " + buffer + " during get");
+      }
+
       if (!lockBuffer(buffer)) {
         // If we cannot lock, remove this from cache and continue.
         matches.remove();
@@ -182,6 +186,9 @@ public class LowLevelCacheImpl implement
     try {
       for (int i = 0; i < ranges.length; ++i) {
         LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+        if (DebugUtils.isTraceLockingEnabled()) {
+          LlapIoImpl.LOG.info("Locking " + buffer + " at put time");
+        }
         buffer.incRef();
         long offset = ranges[i].offset + baseOffset;
         buffer.declaredLength = ranges[i].getLength();
@@ -197,6 +204,9 @@ public class LowLevelCacheImpl implement
                 + fileName + "@" + offset  + " (base " + baseOffset + "); old " + oldVal
                 + ", new " + buffer);
           }
+          if (DebugUtils.isTraceLockingEnabled()) {
+            LlapIoImpl.LOG.info("Locking " + oldVal + "  due to cache collision");
+          }
           if (lockBuffer(oldVal)) {
             // We don't do proper overlap checking because it would cost cycles and we
             // think it will never happen. We do perform the most basic check here.
@@ -206,6 +216,10 @@ public class LowLevelCacheImpl implement
                   + " (base " + baseOffset + ")");
             }
             // We found an old, valid block for this key in the cache.
+            if (DebugUtils.isTraceLockingEnabled()) {
+              LlapIoImpl.LOG.info("Unlocking " + buffer + " due to cache collision with "
+ oldVal);
+            }
+
             releaseBufferInternal(buffer);
             buffers[i] = oldVal;
             if (result == null) {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
Wed Feb 25 02:48:42 2015
@@ -31,8 +31,10 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
@@ -416,6 +418,11 @@ public class OrcEncodedDataProducer impl
 
     @Override
     public void returnData(StreamBuffer data) {
+      if (DebugUtils.isTraceLockingEnabled()) {
+        for (LlapMemoryBuffer buf : data.cacheBuffers) {
+          LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
+        }
+      }
       lowLevelCache.releaseBuffers(data.cacheBuffers);
     }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
Wed Feb 25 02:48:42 2015
@@ -18,16 +18,13 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
 import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
@@ -35,6 +32,7 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
@@ -310,10 +308,16 @@ public class EncodedReaderImpl implement
       }
     }
 
+    // TODO: this is not good; we hold all the blocks until we send them all.
+    //       Hard to avoid due to sharing by RGs... perhaps we can still do better.
     DiskRangeList toFree = toRead.next;
     while (toFree != null) {
       if (toFree instanceof CacheChunk) {
-        cache.releaseBuffer(((CacheChunk)toFree).buffer);
+        LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
+        if (DebugUtils.isTraceLockingEnabled()) {
+          LOG.info("Unlocking " + buffer + " at the end of readEncodedColumns");
+        }
+        cache.releaseBuffer(buffer);
       }
       toFree = toFree.next;
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1662163&r1=1662162&r2=1662163&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Feb
25 02:48:42 2015
@@ -468,7 +468,7 @@ public abstract class InStream extends I
   public abstract void seek(PositionProvider index) throws IOException;
 
   private static void logEmptySeek(String name) {
-    if (LOGL.isWarnEnabled()) {
+    if (LOG.isWarnEnabled()) {
       LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
     }
   }
@@ -592,6 +592,9 @@ public abstract class InStream extends I
       if (current instanceof CacheChunk) {
         // 2a. This is a cached compression buffer, add as is.
         CacheChunk cc = (CacheChunk)current;
+        if (DebugUtils.isTraceLockingEnabled()) {
+          LOG.info("Locking " + cc.buffer + " due to reuse");
+        }
         cache.notifyReused(cc.buffer);
         streamBuffer.cacheBuffers.add(cc.buffer);
         currentCOffset = cc.end;
@@ -645,14 +648,21 @@ public abstract class InStream extends I
 
     // 4. Now decompress (or copy) the data into cache buffers.
     for (ProcCacheChunk chunk : toDecompress) {
-      int startPos = chunk.buffer.byteBuffer.position();
+      ByteBuffer dest = chunk.buffer.byteBuffer;
+      // After the below, position and limit will be screwed up (differently for if/else).
+      // We will reset the position and limit for now.
+      int startPos = dest.position(), startLim = dest.limit();
       if (chunk.isCompressed) {
-        codec.decompress(chunk.originalData, chunk.buffer.byteBuffer);
+        codec.decompress(chunk.originalData, dest);
       } else {
-        chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed data to cache.
+        dest.put(chunk.originalData); // Copy uncompressed data to cache.
       }
-      chunk.buffer.byteBuffer.position(startPos);
+      dest.position(startPos);
+      dest.limit(startLim);
       chunk.originalData = null;
+      if (DebugUtils.isTraceLockingEnabled()) {
+        LOG.info("Locking " + chunk.buffer + " due to reuse (after decompression)");
+      }
       cache.notifyReused(chunk.buffer);
     }
 
@@ -765,9 +775,6 @@ public abstract class InStream extends I
         copy.put(slice);
         next = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset,
             remaining, remaining, (BufferChunk)next, cache, toDecompress, cacheBuffers);
-        if (DebugUtils.isTraceOrcEnabled()) {
-          LOG.info("Adjusting " + next + " to consume " + remaining);
-        }
         if (compressed.remaining() <= 0 && zcr != null) {
           zcr.releaseBuffer(compressed); // We copied the entire buffer.
         }



Mime
View raw message