Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D5E1200D0F for ; Fri, 29 Sep 2017 22:56:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3C10E1609EE; Fri, 29 Sep 2017 20:56:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3C0451609ED for ; Fri, 29 Sep 2017 22:56:44 +0200 (CEST) Received: (qmail 59030 invoked by uid 500); 29 Sep 2017 20:56:25 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 58833 invoked by uid 99); 29 Sep 2017 20:56:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Sep 2017 20:56:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C622F5C0E; Fri, 29 Sep 2017 20:56:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Fri, 29 Sep 2017 20:56:28 -0000 Message-Id: In-Reply-To: <0c4469a3c03f4972ae77f12d67fe7c3e@git.apache.org> References: <0c4469a3c03f4972ae77f12d67fe7c3e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] hive git commit: HIVE-17610 : LLAP IO: an exception in exception handling can hide the original exception (Sergey Shelukhin, reviewed by Rajesh Balamohan) archived-at: Fri, 29 Sep 2017 20:56:45 -0000 HIVE-17610 : LLAP IO: an exception in exception handling can hide the original exception (Sergey Shelukhin, reviewed by Rajesh Balamohan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e4097ef Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e4097ef Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e4097ef Branch: refs/heads/master Commit: 1e4097ef767246538c8386f52c49f5207433a98e Parents: e1e11d4 Author: sergey Authored: Fri Sep 29 13:41:58 2017 -0700 Committer: sergey Committed: Fri Sep 29 13:56:18 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/io/api/impl/LlapRecordReader.java | 3 +- .../ql/io/orc/encoded/EncodedReaderImpl.java | 210 +++++++++++-------- 2 files changed, 127 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1e4097ef/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 5d93884..70bd05c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -379,8 +379,9 @@ class LlapRecordReader @Override public void setError(Throwable t) { counters.incrCounter(LlapIOCounters.NUM_ERRORS); - LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}", + LlapIoImpl.LOG.debug("setError called; current state closed {}, done {}, err {}, pending {}", isClosed, isDone, pendingError, pendingData.size()); + LlapIoImpl.LOG.warn("setError called with an error", t); assert t != null; synchronized (pendingData) { pendingError = t; http://git-wip-us.apache.org/repos/asf/hive/blob/1e4097ef/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 467bac2..31d5dd3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -374,9 +374,10 @@ class EncodedReaderImpl implements EncodedReader { // CacheChunks, so the list is just CacheChunk-s from that point on. DiskRangeList iter = preReadUncompressedStreams(stripeOffset, colCtxs, toRead, toRelease); + // 4. Finally, decompress data, map per RG, and return to caller. + // We go by RG and not by column because that is how data is processed. + boolean hasError = true; try { - // 4. Finally, decompress data, map per RG, and return to caller. - // We go by RG and not by column because that is how data is processed. int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); for (int rgIx = 0; rgIx < rgCount; ++rgIx) { if (rgs != null && !rgs[rgIx]) { @@ -386,7 +387,7 @@ class EncodedReaderImpl implements EncodedReader { // Create the batch we will use to return data for this RG. OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); trace.logStartRg(rgIx); - boolean hasError = true; + boolean hasErrorForEcb = true; try { ecb.init(fileKey, stripeIx, rgIx, included.length); for (int colIx = 0; colIx < colCtxs.length; ++colIx) { @@ -462,10 +463,14 @@ class EncodedReaderImpl implements EncodedReader { } } } - hasError = false; + hasErrorForEcb = false; } finally { - if (hasError) { - releaseEcbRefCountsOnError(ecb); + if (hasErrorForEcb) { + try { + releaseEcbRefCountsOnError(ecb); + } catch (Throwable t) { + LOG.error("Error during the cleanup of an error; ignoring", t); + } } } // After this, the non-initial refcounts are the responsibility of the consumer. @@ -477,35 +482,40 @@ class EncodedReaderImpl implements EncodedReader { + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } trace.logRanges(fileKey, stripeOffset, toRead.next, RangesSrc.PREREAD); + hasError = false; } finally { - // Release the unreleased stripe-level buffers. See class comment about refcounts. - for (int colIx = 0; colIx < colCtxs.length; ++colIx) { - ColumnReadContext ctx = colCtxs[colIx]; - if (ctx == null) continue; // This column is not included. - for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { - StreamContext sctx = ctx.streams[streamIx]; - if (sctx == null || sctx.stripeLevelStream == null) continue; - if (0 != sctx.stripeLevelStream.decRef()) continue; - // Note - this is a little bit confusing; the special treatment of stripe-level buffers - // is because we run the ColumnStreamData refcount one ahead (as specified above). It - // may look like this would release the buffers too many times (one release from the - // consumer, one from releaseInitialRefcounts below, and one here); however, this is - // merely handling a special case where all the batches that are sharing the stripe- - // level stream have been processed before we got here; they have all decRef-ed the CSD, - // but have not released the buffers because of that extra refCount. So, this is - // essentially the "consumer" refcount being released here. - for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Unlocking {} at the end of processing", buf); + try { + // Release the unreleased stripe-level buffers. See class comment about refcounts. + for (int colIx = 0; colIx < colCtxs.length; ++colIx) { + ColumnReadContext ctx = colCtxs[colIx]; + if (ctx == null) continue; // This column is not included. + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + StreamContext sctx = ctx.streams[streamIx]; + if (sctx == null || sctx.stripeLevelStream == null) continue; + if (0 != sctx.stripeLevelStream.decRef()) continue; + // Note - this is a little bit confusing; the special treatment of stripe-level buffers + // is because we run the ColumnStreamData refcount one ahead (as specified above). It + // may look like this would release the buffers too many times (one release from the + // consumer, one from releaseInitialRefcounts below, and one here); however, this is + // merely handling a special case where all the batches that are sharing the stripe- + // level stream have been processed before we got here; they have all decRef-ed the CSD, + // but have not released the buffers because of that extra refCount. So, this is + // essentially the "consumer" refcount being released here. + for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Unlocking {} at the end of processing", buf); + } + cacheWrapper.releaseBuffer(buf); } - cacheWrapper.releaseBuffer(buf); } } + releaseInitialRefcounts(toRead.next); + // Release buffers as we are done with all the streams... also see toRelease comment. + releaseBuffers(toRelease.keySet(), true); + } catch (Throwable t) { + if (!hasError) throw new IOException(t); + LOG.error("Error during the cleanup after another error; ignoring", t); } - - releaseInitialRefcounts(toRead.next); - // Release buffers as we are done with all the streams... also see toRelease comment. - releaseBuffers(toRelease.keySet(), true); } releaseCacheChunksIntoObjectPool(toRead.next); } @@ -623,6 +633,18 @@ class EncodedReaderImpl implements EncodedReader { while (current != null) { DiskRangeList toFree = current; current = current.next; + if (toFree instanceof ProcCacheChunk) { + ProcCacheChunk pcc = (ProcCacheChunk)toFree; + if (pcc.originalData != null) { + // This can only happen in case of failure - we read some data, but didn't decompress + // it. Deallocate the buffer directly, do not decref. + if (pcc.getBuffer() != null) { + cacheWrapper.getAllocator().deallocate(pcc.getBuffer()); + } + continue; + } + + } if (!(toFree instanceof CacheChunk)) continue; CacheChunk cc = (CacheChunk)toFree; if (cc.getBuffer() == null) continue; @@ -841,11 +863,16 @@ class EncodedReaderImpl implements EncodedReader { copyUncompressedChunk(chunk.originalData, dest); } - chunk.originalData = null; if (isTracingEnabled) { LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); } - cacheWrapper.reuseBuffer(chunk.getBuffer()); + // After we set originalData to null, we incref the buffer and the cleanup would decref it. + // Note that this assumes the failure during incref means incref didn't occur. + try { + cacheWrapper.reuseBuffer(chunk.getBuffer()); + } finally { + chunk.originalData = null; + } } // 5. Release the copies we made directly to the cleaner. @@ -1847,60 +1874,69 @@ class EncodedReaderImpl implements EncodedReader { DiskRangeList iter = preReadUncompressedStreams(stripeOffset, colCtxs, toRead, toRelease); // 4. Decompress the data. - for (int colIx = 0; colIx < colCtxs.length; ++colIx) { - ReadContext ctx = colCtxs[colIx]; - if (ctx == null) continue; // This column is not included. - for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { - StreamContext sctx = ctx.streams[streamIx]; - try { - if (isTracingEnabled) { - LOG.trace("Getting index stream " + sctx.kind + " for column " + ctx.colIx - + " at " + sctx.offset + ", " + sctx.length); - } - ColumnStreamData csd = POOLS.csdPool.take(); - long endCOffset = sctx.offset + sctx.length; - DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset, - endCOffset, csd, endCOffset, sctx.offset, toRelease); - if (lastCached != null) { - iter = lastCached; - } - CodedInputStream cis = CodedInputStream.newInstance( - new IndexStream(csd.getCacheBuffers(), sctx.length)); - cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT); - switch (sctx.kind) { - case ROW_INDEX: - index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis); - break; - case BLOOM_FILTER: - case BLOOM_FILTER_UTF8: - index.getBloomFilterIndex()[colIx] = OrcProto.BloomFilterIndex.parseFrom(cis); - break; - default: - throw new AssertionError("Unexpected index stream type " + sctx.kind); - } - // We are done with the buffers; unlike data blocks, we are also the consumer. Release. - for (MemoryBuffer buf : csd.getCacheBuffers()) { - if (buf == null) continue; - cacheWrapper.releaseBuffer(buf); + boolean hasError = true; + try { + for (int colIx = 0; colIx < colCtxs.length; ++colIx) { + ReadContext ctx = colCtxs[colIx]; + if (ctx == null) continue; // This column is not included. + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + StreamContext sctx = ctx.streams[streamIx]; + try { + if (isTracingEnabled) { + LOG.trace("Getting index stream " + sctx.kind + " for column " + ctx.colIx + + " at " + sctx.offset + ", " + sctx.length); + } + ColumnStreamData csd = POOLS.csdPool.take(); + long endCOffset = sctx.offset + sctx.length; + DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset, + endCOffset, csd, endCOffset, sctx.offset, toRelease); + if (lastCached != null) { + iter = lastCached; + } + CodedInputStream cis = CodedInputStream.newInstance( + new IndexStream(csd.getCacheBuffers(), sctx.length)); + cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT); + switch (sctx.kind) { + case ROW_INDEX: + index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis); + break; + case BLOOM_FILTER: + case BLOOM_FILTER_UTF8: + index.getBloomFilterIndex()[colIx] = OrcProto.BloomFilterIndex.parseFrom(cis); + break; + default: + throw new AssertionError("Unexpected index stream type " + sctx.kind); + } + // We are done with the buffers; unlike data blocks, we are also the consumer. Release. + for (MemoryBuffer buf : csd.getCacheBuffers()) { + if (buf == null) continue; + cacheWrapper.releaseBuffer(buf); + } + } catch (Exception ex) { + DiskRangeList drl = toRead == null ? null : toRead.next; + LOG.error("Error getting stream " + sctx.kind + " for column " + ctx.colIx + + " at " + sctx.offset + ", " + sctx.length + "; toRead " + + RecordReaderUtils.stringifyDiskRanges(drl), ex); + throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); } - } catch (Exception ex) { - DiskRangeList drl = toRead == null ? null : toRead.next; - LOG.error("Error getting stream " + sctx.kind + " for column " + ctx.colIx - + " at " + sctx.offset + ", " + sctx.length + "; toRead " - + RecordReaderUtils.stringifyDiskRanges(drl), ex); - throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); } } - } - if (isTracingEnabled) { - LOG.trace("Disk ranges after preparing all the data " - + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + if (isTracingEnabled) { + LOG.trace("Disk ranges after preparing all the data " + + RecordReaderUtils.stringifyDiskRanges(toRead.next)); + } + hasError = false; + } finally { + // Release the unreleased buffers. See class comment about refcounts. + try { + releaseInitialRefcounts(toRead.next); + releaseBuffers(toRelease.keySet(), true); + } catch (Throwable t) { + if (!hasError) throw new IOException(t); + LOG.error("Error during the cleanup after another error; ignoring", t); + } } - - // Release the unreleased buffers. See class comment about refcounts. - releaseInitialRefcounts(toRead.next); - releaseBuffers(toRelease.keySet(), true); releaseCacheChunksIntoObjectPool(toRead.next); } @@ -1939,10 +1975,14 @@ class EncodedReaderImpl implements EncodedReader { // At this point, everything in the list is going to have a refcount of one. Unless it // failed between the allocation and the incref for a single item, we should be ok. if (hasError) { - releaseInitialRefcounts(toRead.next); - if (toRelease != null) { - releaseBuffers(toRelease.keySet(), true); - toRelease.clear(); + try { + releaseInitialRefcounts(toRead.next); + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + toRelease.clear(); + } + } catch (Throwable t) { + LOG.error("Error during the cleanup after another error; ignoring", t); } } }