hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [12/17] hive git commit: HIVE-17610 : LLAP IO: an exception in exception handling can hide the original exception (Sergey Shelukhin, reviewed by Rajesh Balamohan)
Date Fri, 29 Sep 2017 21:01:53 GMT
HIVE-17610 : LLAP IO: an exception in exception handling can hide the original exception (Sergey
Shelukhin, reviewed by Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e4097ef
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e4097ef
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e4097ef

Branch: refs/heads/hive-14535
Commit: 1e4097ef767246538c8386f52c49f5207433a98e
Parents: e1e11d4
Author: sergey <sershe@apache.org>
Authored: Fri Sep 29 13:41:58 2017 -0700
Committer: sergey <sershe@apache.org>
Committed: Fri Sep 29 13:56:18 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/io/api/impl/LlapRecordReader.java |   3 +-
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 210 +++++++++++--------
 2 files changed, 127 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1e4097ef/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index 5d93884..70bd05c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -379,8 +379,9 @@ class LlapRecordReader
   @Override
   public void setError(Throwable t) {
     counters.incrCounter(LlapIOCounters.NUM_ERRORS);
-    LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
+    LlapIoImpl.LOG.debug("setError called; current state closed {}, done {}, err {}, pending
{}",
         isClosed, isDone, pendingError, pendingData.size());
+    LlapIoImpl.LOG.warn("setError called with an error", t);
     assert t != null;
     synchronized (pendingData) {
       pendingError = t;

http://git-wip-us.apache.org/repos/asf/hive/blob/1e4097ef/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 467bac2..31d5dd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -374,9 +374,10 @@ class EncodedReaderImpl implements EncodedReader {
     //    CacheChunks, so the list is just CacheChunk-s from that point on.
     DiskRangeList iter = preReadUncompressedStreams(stripeOffset, colCtxs, toRead, toRelease);
 
+    // 4. Finally, decompress data, map per RG, and return to caller.
+    // We go by RG and not by column because that is how data is processed.
+    boolean hasError = true;
     try {
-      // 4. Finally, decompress data, map per RG, and return to caller.
-      // We go by RG and not by column because that is how data is processed.
       int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
       for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
         if (rgs != null && !rgs[rgIx]) {
@@ -386,7 +387,7 @@ class EncodedReaderImpl implements EncodedReader {
         // Create the batch we will use to return data for this RG.
         OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
         trace.logStartRg(rgIx);
-        boolean hasError = true;
+        boolean hasErrorForEcb = true;
         try {
           ecb.init(fileKey, stripeIx, rgIx, included.length);
           for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
@@ -462,10 +463,14 @@ class EncodedReaderImpl implements EncodedReader {
               }
             }
           }
-          hasError = false;
+          hasErrorForEcb = false;
         } finally {
-          if (hasError) {
-            releaseEcbRefCountsOnError(ecb);
+          if (hasErrorForEcb) {
+            try {
+              releaseEcbRefCountsOnError(ecb);
+            } catch (Throwable t) {
+              LOG.error("Error during the cleanup of an error; ignoring", t);
+            }
           }
         }
         // After this, the non-initial refcounts are the responsibility of the consumer.
@@ -477,35 +482,40 @@ class EncodedReaderImpl implements EncodedReader {
             + RecordReaderUtils.stringifyDiskRanges(toRead.next));
       }
       trace.logRanges(fileKey, stripeOffset, toRead.next, RangesSrc.PREREAD);
+      hasError = false;
     } finally {
-      // Release the unreleased stripe-level buffers. See class comment about refcounts.
-      for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
-        ColumnReadContext ctx = colCtxs[colIx];
-        if (ctx == null) continue; // This column is not included.
-        for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-          StreamContext sctx = ctx.streams[streamIx];
-          if (sctx == null || sctx.stripeLevelStream == null) continue;
-          if (0 != sctx.stripeLevelStream.decRef()) continue;
-          // Note - this is a little bit confusing; the special treatment of stripe-level
buffers
-          // is because we run the ColumnStreamData refcount one ahead (as specified above).
It
-          // may look like this would release the buffers too many times (one release from
the
-          // consumer, one from releaseInitialRefcounts below, and one here); however, this
is
-          // merely handling a special case where all the batches that are sharing the stripe-
-          // level stream have been processed before we got here; they have all decRef-ed
the CSD,
-          // but have not released the buffers because of that extra refCount. So, this is
-          // essentially the "consumer" refcount being released here.
-          for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Unlocking {} at the end of processing", buf);
+      try {
+        // Release the unreleased stripe-level buffers. See class comment about refcounts.
+        for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+          ColumnReadContext ctx = colCtxs[colIx];
+          if (ctx == null) continue; // This column is not included.
+          for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+            StreamContext sctx = ctx.streams[streamIx];
+            if (sctx == null || sctx.stripeLevelStream == null) continue;
+            if (0 != sctx.stripeLevelStream.decRef()) continue;
+            // Note - this is a little bit confusing; the special treatment of stripe-level
buffers
+            // is because we run the ColumnStreamData refcount one ahead (as specified above).
It
+            // may look like this would release the buffers too many times (one release from
the
+            // consumer, one from releaseInitialRefcounts below, and one here); however,
this is
+            // merely handling a special case where all the batches that are sharing the
stripe-
+            // level stream have been processed before we got here; they have all decRef-ed
the CSD,
+            // but have not released the buffers because of that extra refCount. So, this
is
+            // essentially the "consumer" refcount being released here.
+            for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Unlocking {} at the end of processing", buf);
+              }
+              cacheWrapper.releaseBuffer(buf);
             }
-            cacheWrapper.releaseBuffer(buf);
           }
         }
+        releaseInitialRefcounts(toRead.next);
+        // Release buffers as we are done with all the streams... also see toRelease comment.
+        releaseBuffers(toRelease.keySet(), true);
+      } catch (Throwable t) {
+        if (!hasError) throw new IOException(t);
+        LOG.error("Error during the cleanup after another error; ignoring", t);
       }
-
-      releaseInitialRefcounts(toRead.next);
-      // Release buffers as we are done with all the streams... also see toRelease comment.
-      releaseBuffers(toRelease.keySet(), true);
     }
     releaseCacheChunksIntoObjectPool(toRead.next);
   }
@@ -623,6 +633,18 @@ class EncodedReaderImpl implements EncodedReader {
     while (current != null) {
       DiskRangeList toFree = current;
       current = current.next;
+      if (toFree instanceof ProcCacheChunk) {
+        ProcCacheChunk pcc = (ProcCacheChunk)toFree;
+        if (pcc.originalData != null) {
+          // This can only happen in case of failure - we read some data, but didn't decompress
+          // it. Deallocate the buffer directly, do not decref.
+          if (pcc.getBuffer() != null) {
+            cacheWrapper.getAllocator().deallocate(pcc.getBuffer());
+          }
+          continue;
+        }
+        
+      }
       if (!(toFree instanceof CacheChunk)) continue;
       CacheChunk cc = (CacheChunk)toFree;
       if (cc.getBuffer() == null) continue;
@@ -841,11 +863,16 @@ class EncodedReaderImpl implements EncodedReader {
         copyUncompressedChunk(chunk.originalData, dest);
       }
 
-      chunk.originalData = null;
       if (isTracingEnabled) {
         LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
       }
-      cacheWrapper.reuseBuffer(chunk.getBuffer());
+      // After we set originalData to null, we incref the buffer and the cleanup would decref
it.
+      // Note that this assumes the failure during incref means incref didn't occur.
+      try {
+        cacheWrapper.reuseBuffer(chunk.getBuffer());
+      } finally {
+        chunk.originalData = null;
+      }
     }
 
     // 5. Release the copies we made directly to the cleaner.
@@ -1847,60 +1874,69 @@ class EncodedReaderImpl implements EncodedReader {
     DiskRangeList iter = preReadUncompressedStreams(stripeOffset, colCtxs, toRead, toRelease);
 
     // 4. Decompress the data.
-    for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
-      ReadContext ctx = colCtxs[colIx];
-      if (ctx == null) continue; // This column is not included.
-      for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-        StreamContext sctx = ctx.streams[streamIx];
-        try {
-          if (isTracingEnabled) {
-            LOG.trace("Getting index stream " + sctx.kind + " for column " + ctx.colIx
-                + " at " + sctx.offset + ", " + sctx.length);
-          }
-          ColumnStreamData csd = POOLS.csdPool.take();
-          long endCOffset = sctx.offset + sctx.length;
-          DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset,
-              endCOffset, csd, endCOffset, sctx.offset, toRelease);
-          if (lastCached != null) {
-            iter = lastCached;
-          }
-          CodedInputStream cis = CodedInputStream.newInstance(
-              new IndexStream(csd.getCacheBuffers(), sctx.length));
-          cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
-          switch (sctx.kind) {
-            case ROW_INDEX:
-              index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis);
-              break;
-            case BLOOM_FILTER:
-            case BLOOM_FILTER_UTF8:
-              index.getBloomFilterIndex()[colIx] = OrcProto.BloomFilterIndex.parseFrom(cis);
-              break;
-            default:
-              throw new AssertionError("Unexpected index stream type " + sctx.kind);
-          }
-          // We are done with the buffers; unlike data blocks, we are also the consumer.
Release.
-          for (MemoryBuffer buf : csd.getCacheBuffers()) {
-            if (buf == null) continue;
-            cacheWrapper.releaseBuffer(buf);
+    boolean hasError = true;
+    try {
+      for (int colIx = 0; colIx < colCtxs.length; ++colIx) {
+        ReadContext ctx = colCtxs[colIx];
+        if (ctx == null) continue; // This column is not included.
+        for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
+          StreamContext sctx = ctx.streams[streamIx];
+          try {
+            if (isTracingEnabled) {
+              LOG.trace("Getting index stream " + sctx.kind + " for column " + ctx.colIx
+                  + " at " + sctx.offset + ", " + sctx.length);
+            }
+            ColumnStreamData csd = POOLS.csdPool.take();
+            long endCOffset = sctx.offset + sctx.length;
+            DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset,
+                endCOffset, csd, endCOffset, sctx.offset, toRelease);
+            if (lastCached != null) {
+              iter = lastCached;
+            }
+            CodedInputStream cis = CodedInputStream.newInstance(
+                new IndexStream(csd.getCacheBuffers(), sctx.length));
+            cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
+            switch (sctx.kind) {
+              case ROW_INDEX:
+                index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis);
+                break;
+              case BLOOM_FILTER:
+              case BLOOM_FILTER_UTF8:
+                index.getBloomFilterIndex()[colIx] = OrcProto.BloomFilterIndex.parseFrom(cis);
+                break;
+              default:
+                throw new AssertionError("Unexpected index stream type " + sctx.kind);
+            }
+            // We are done with the buffers; unlike data blocks, we are also the consumer.
Release.
+            for (MemoryBuffer buf : csd.getCacheBuffers()) {
+              if (buf == null) continue;
+              cacheWrapper.releaseBuffer(buf);
+            }
+          } catch (Exception ex) {
+            DiskRangeList drl = toRead == null ? null : toRead.next;
+            LOG.error("Error getting stream " + sctx.kind + " for column " + ctx.colIx
+                + " at " + sctx.offset + ", " + sctx.length + "; toRead "
+                + RecordReaderUtils.stringifyDiskRanges(drl), ex);
+            throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
           }
-        } catch (Exception ex) {
-          DiskRangeList drl = toRead == null ? null : toRead.next;
-          LOG.error("Error getting stream " + sctx.kind + " for column " + ctx.colIx
-              + " at " + sctx.offset + ", " + sctx.length + "; toRead "
-              + RecordReaderUtils.stringifyDiskRanges(drl), ex);
-          throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
         }
       }
-    }
 
-    if (isTracingEnabled) {
-      LOG.trace("Disk ranges after preparing all the data "
-          + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+      if (isTracingEnabled) {
+        LOG.trace("Disk ranges after preparing all the data "
+            + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+      }
+      hasError = false;
+    } finally {
+      // Release the unreleased buffers. See class comment about refcounts.
+      try {
+        releaseInitialRefcounts(toRead.next);
+        releaseBuffers(toRelease.keySet(), true);
+      } catch (Throwable t) {
+        if (!hasError) throw new IOException(t);
+        LOG.error("Error during the cleanup after another error; ignoring", t);
+      }
     }
-
-    // Release the unreleased buffers. See class comment about refcounts.
-    releaseInitialRefcounts(toRead.next);
-    releaseBuffers(toRelease.keySet(), true);
     releaseCacheChunksIntoObjectPool(toRead.next);
   }
 
@@ -1939,10 +1975,14 @@ class EncodedReaderImpl implements EncodedReader {
       // At this point, everything in the list is going to have a refcount of one. Unless
it
       // failed between the allocation and the incref for a single item, we should be ok.

       if (hasError) {
-        releaseInitialRefcounts(toRead.next);
-        if (toRelease != null) {
-          releaseBuffers(toRelease.keySet(), true);
-          toRelease.clear();
+        try {
+          releaseInitialRefcounts(toRead.next);
+          if (toRelease != null) {
+            releaseBuffers(toRelease.keySet(), true);
+            toRelease.clear();
+          }
+        } catch (Throwable t) {
+          LOG.error("Error during the cleanup after another error; ignoring", t);
         }
       }
     }


Mime
View raw message