hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1662604 - in /hive/branches/llap: llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
Date Fri, 27 Feb 2015 00:56:22 GMT
Author: sershe
Date: Fri Feb 27 00:56:22 2015
New Revision: 1662604

URL: http://svn.apache.org/r1662604
Log:
Fix an issue with cache collisions

Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1662604&r1=1662603&r2=1662604&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
Fri Feb 27 00:56:22 2015
@@ -274,7 +274,6 @@ public class LowLevelCacheImpl implement
     return ((number + 63) & ~63);
   }
 
-
   @Override
   public void releaseBuffer(LlapMemoryBuffer buffer) {
     releaseBufferInternal((LlapCacheableBuffer)buffer);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1662604&r1=1662603&r2=1662604&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
Fri Feb 27 00:56:22 2015
@@ -219,13 +219,14 @@ public class EncodedReaderImpl implement
     // 2. Now, read all of the ranges from cache or disk.
     DiskRangeListMutateHelper toRead = new DiskRangeListMutateHelper(listToRead.get());
     if (DebugUtils.isTraceOrcEnabled()) {
-      LOG.info("Resulting disk ranges to read: " + RecordReaderUtils.stringifyDiskRanges(toRead));
+      LOG.info("Resulting disk ranges to read: "
+          + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
     if (cache != null) {
       cache.getFileData(fileName, toRead.next, stripeOffset);
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Disk ranges after cache (base offset " + stripeOffset
-            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead));
+            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
       }
     }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1662604&r1=1662603&r2=1662604&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Fri Feb
27 00:56:22 2015
@@ -251,9 +251,9 @@ public abstract class InStream extends I
           codec.decompress(slice, uncompressed);
           if (cache != null) {
             // TODO: this is the inefficient path
-            // TODO#: this is invalid; base stripe offset should be passed.
-            cache.putFileData(fileName, new DiskRange[] { new DiskRange(originalOffset,
-                chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer
}, 0);
+            // TODO#: this is invalid; base stripe offset should be passed, return value
handled.
+            //cache.putFileData(fileName, new DiskRange[] { new DiskRange(originalOffset,
+            //  chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer
}, 0);
           }
         }
       } else {
@@ -675,10 +675,36 @@ public abstract class InStream extends I
     }
 
     // 6. Finally, put data to cache.
-    cache.putFileData(fileName, cacheKeys, targetBuffers, baseOffset);
+    long[] collisionMask = cache.putFileData(fileName, cacheKeys, targetBuffers, baseOffset);
+    processCacheCollisions(cache, collisionMask, toDecompress, targetBuffers);
     return lastCached;
   }
 
+  private static void processCacheCollisions(LowLevelCache cache, long[] collisionMask,
+      List<ProcCacheChunk> toDecompress, LlapMemoryBuffer[] targetBuffers) {
+    if (collisionMask == null) return;
+    assert collisionMask.length >= (toDecompress.size() >>> 6);
+    // There are some elements that were cached in parallel, take care of them.
+    long maskVal = -1;
+    for (int i = 0; i < toDecompress.size(); ++i) {
+      if ((i & 63) == 0) {
+        maskVal = collisionMask[i >>> 6];
+      }
+      if ((maskVal & 1) == 1) {
+        // Cache has found an old buffer for the key and put it into array. Had the put succeeded
+        // for our new buffer, it would have refcount of 2 - 1 from put, and 1 from notifyReused
+        // call above. "Old" buffer now has refcount of 1 from put; new buffer is unchanged.
We
+        // will discard the new buffer, and lock old again to make it consistent.
+        ProcCacheChunk replacedChunk = toDecompress.get(i);
+        LlapMemoryBuffer replacementBuffer = targetBuffers[i];
+        cache.releaseBuffer(replacedChunk.buffer);
+        cache.notifyReused(replacementBuffer);
+        replacedChunk.buffer = replacementBuffer;
+      }
+      maskVal >>= 1;
+    }
+  }
+
 
   /** Finds compressed offset in a stream and makes sure iter points to its position.
      This may be necessary for obscure combinations of compression and encoding boundaries.
*/



Mime
View raw message