hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/3] hive git commit: HIVE-15143 : add logging for HIVE-15024 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Thu, 10 Nov 2016 20:21:56 GMT
Repository: hive
Updated Branches:
  refs/heads/master 85711700e -> 89efd238e


HIVE-15143 : add logging for HIVE-15024 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7186947f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7186947f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7186947f

Branch: refs/heads/master
Commit: 7186947f22c6a9611301e82c45f0ba62780326bc
Parents: f4598fb
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Thu Nov 10 12:10:28 2016 -0800
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Thu Nov 10 12:21:30 2016 -0800

----------------------------------------------------------------------
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 128 +++++++++++--------
 1 file changed, 75 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7186947f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index bcb54d6..a4925b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -350,58 +350,66 @@ class EncodedReaderImpl implements EncodedReader {
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
           ColumnStreamData cb = null;
-          if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
-            // This stream is for entire stripe and needed for every RG; uncompress once
and reuse.
-            if (isTracingEnabled) {
-              LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding
+ "] for"
-                  + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", "
+ sctx.length);
-            }
-            if (sctx.stripeLevelStream == null) {
-              sctx.stripeLevelStream = POOLS.csdPool.take();
-              // We will be using this for each RG while also sending RGs to processing.
-              // To avoid buffers being unlocked, run refcount one ahead; we will not increase
-              // it when building the last RG, so each RG processing will decref once, and
the
-              // last one will unlock the buffers.
-              sctx.stripeLevelStream.incRef();
-              // For stripe-level streams we don't need the extra refcount on the block.
-              // See class comment about refcounts.
-              long unlockUntilCOffset = sctx.offset + sctx.length;
-              DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
-                  sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
+          try {
+            if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
+              // This stream is for entire stripe and needed for every RG; uncompress once
and reuse.
+              if (isTracingEnabled) {
+                LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding
+ "] for"
+                    + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ",
" + sctx.length);
+              }
+              if (sctx.stripeLevelStream == null) {
+                sctx.stripeLevelStream = POOLS.csdPool.take();
+                // We will be using this for each RG while also sending RGs to processing.
+                // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+                // it when building the last RG, so each RG processing will decref once,
and the
+                // last one will unlock the buffers.
+                sctx.stripeLevelStream.incRef();
+                // For stripe-level streams we don't need the extra refcount on the block.
+                // See class comment about refcounts.
+                long unlockUntilCOffset = sctx.offset + sctx.length;
+                DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
+                    sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
+                    unlockUntilCOffset, sctx.offset);
+                if (lastCached != null) {
+                  iter = lastCached;
+                }
+              }
+              if (!isLastRg) {
+                sctx.stripeLevelStream.incRef();
+              }
+              cb = sctx.stripeLevelStream;
+            } else {
+              // This stream can be separated by RG using index. Let's do that.
+              // Offset to where this RG begins.
+              long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset);
+              // Offset relative to the beginning of the stream of where this RG ends.
+              long nextCOffsetRel = isLastRg ? sctx.length
+                  : nextIndex.getPositions(sctx.streamIndexOffset);
+              // Offset before which this RG is guaranteed to end. Can only be estimated.
+              // We estimate the same way for compressed and uncompressed for now.
+              long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
+                  isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
+              // As we read, we can unlock initial refcounts for the buffers that end before
+              // the data that we need for this RG.
+              long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
+              cb = createRgColumnStreamData(
+                  rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
+              boolean isStartOfStream = sctx.bufferIter == null;
+              DiskRangeList lastCached = readEncodedStream(stripeOffset,
+                  (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
                   unlockUntilCOffset, sctx.offset);
               if (lastCached != null) {
-                iter = lastCached;
+                sctx.bufferIter = iter = lastCached;
               }
             }
-            if (!isLastRg) {
-              sctx.stripeLevelStream.incRef();
-            }
-            cb = sctx.stripeLevelStream;
-          } else {
-            // This stream can be separated by RG using index. Let's do that.
-            // Offset to where this RG begins.
-            long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset);
-            // Offset relative to the beginning of the stream of where this RG ends.
-            long nextCOffsetRel = isLastRg ? sctx.length
-                : nextIndex.getPositions(sctx.streamIndexOffset);
-            // Offset before which this RG is guaranteed to end. Can only be estimated.
-            // We estimate the same way for compressed and uncompressed for now.
-            long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
-                isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
-            // As we read, we can unlock initial refcounts for the buffers that end before
-            // the data that we need for this RG.
-            long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
-            cb = createRgColumnStreamData(
-                rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
-            boolean isStartOfStream = sctx.bufferIter == null;
-            DiskRangeList lastCached = readEncodedStream(stripeOffset,
-                (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
-                unlockUntilCOffset, sctx.offset);
-            if (lastCached != null) {
-              sctx.bufferIter = iter = lastCached;
-            }
+            ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
+          } catch (Exception ex) {
+            DiskRangeList drl = toRead == null ? null : toRead.next;
+            LOG.error("Error getting stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+                + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", "
+                + sctx.length + "; toRead " + RecordReaderUtils.stringifyDiskRanges(drl),
ex);
+            throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
           }
-          ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
         }
       }
       if (isRGSelected) {
@@ -612,11 +620,20 @@ class EncodedReaderImpl implements EncodedReader {
     CacheChunk lastUncompressed = null;
 
     // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see
below).
-    lastUncompressed = isCompressed ?
-        prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
-            unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
-      : prepareRangesForUncompressedRead(
-          cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
+    try {
+      lastUncompressed = isCompressed ?
+          prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+              unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
+        : prepareRangesForUncompressedRead(
+            cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
+    } catch (Exception ex) {
+      LOG.error("Failed " + (isCompressed ? "" : "un") + " compressed read; cOffset " + cOffset
+          + ", endCOffset " + endCOffset + ", streamOffset " + streamOffset
+          + ", unlockUntilCOffset " + unlockUntilCOffset + "; ranges passed in "
+          + RecordReaderUtils.stringifyDiskRanges(start) + "; ranges passed to prepate "
+          + RecordReaderUtils.stringifyDiskRanges(current)); // Don't log exception here.
+      throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+    }
 
     // 2.5. Remember the bad estimates for future reference.
     if (badEstimates != null && !badEstimates.isEmpty()) {
@@ -668,7 +685,6 @@ class EncodedReaderImpl implements EncodedReader {
     }
 
     // 6. Finally, put uncompressed data to cache.
-
     if (fileKey != null) {
       long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers,
baseOffset);
       processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
@@ -725,6 +741,12 @@ class EncodedReaderImpl implements EncodedReader {
       } else {
         // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
         // several disk ranges, so we might need to combine them.
+        if (!(current instanceof BufferChunk)) {
+          String msg = "Found an unexpected " + current.getClass().getSimpleName() + ": "
+              + current + " while looking at " + currentOffset;
+          LOG.error(msg);
+          throw new RuntimeException(msg);
+        }
         BufferChunk bc = (BufferChunk)current;
         ProcCacheChunk newCached = addOneCompressionBuffer(
             bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);


Mime
View raw message