Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9B194200D2F for ; Wed, 18 Oct 2017 00:56:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 99A8F1609EC; Tue, 17 Oct 2017 22:56:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 924141609EB for ; Wed, 18 Oct 2017 00:56:57 +0200 (CEST) Received: (qmail 8458 invoked by uid 500); 17 Oct 2017 22:56:56 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 8449 invoked by uid 99); 17 Oct 2017 22:56:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Oct 2017 22:56:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED784DFB32; Tue, 17 Oct 2017 22:56:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lei@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Date: Tue, 17 Oct 2017 22:56:55 +0000 (UTC) archived-at: Tue, 17 Oct 2017 22:56:58 -0000 Repository: hadoop Updated Branches: refs/heads/branch-3.0 81a86860b -> 6959db9c2 HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6959db9c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6959db9c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6959db9c Branch: refs/heads/branch-3.0 Commit: 6959db9c20217f6adb12e9f3140f5db9a26c38c4 Parents: 81a8686 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:53:07 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++++++---- .../org/apache/hadoop/hdfs/DataStreamer.java | 31 ++------ .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++++++++++++++++++ .../TestDFSStripedOutputStreamWithFailure.java | 76 ++++++++++++++++++++ 4 files changed, 184 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { - streamer.getLastException().set( - new IOException("Lease timeout of " - + (dfsClient.getConf().getHdfsTimeout() / 1000) - + " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { - final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { - throw ioe; + if (goodStreamers < minReplication) { + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions - // may be thrown if more than 3 streamers fail, or updatePipeline RPC - // fails. Streamers may keep waiting for the new block/GS information. - // Thus need to force closing these threads. + // may be thrown if the number of failed streamers is more than the + // number of parity blocks, or updatePipeline RPC fails. Streamers may + // keep waiting for the new block/GS information. Thus need to force + // closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 99fa5f3..c1473dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -285,39 +285,22 @@ class DataStreamer extends Daemon { packets.clear(); } - class LastExceptionInStreamer { - private IOException thrown; - - synchronized void set(Throwable t) { - assert t != null; - this.thrown = t instanceof IOException ? - (IOException) t : new IOException(t); - } - - synchronized void clear() { - thrown = null; - } - - /** Check if there already is an exception. */ + class LastExceptionInStreamer extends ExceptionLastSeen { + /** + * Check if there already is an exception. + */ + @Override synchronized void check(boolean resetToNull) throws IOException { + final IOException thrown = get(); if (thrown != null) { if (LOG.isTraceEnabled()) { // wrap and print the exception to know when the check is called LOG.trace("Got Exception while checking, " + DataStreamer.this, new Throwable(thrown)); } - final IOException e = thrown; - if (resetToNull) { - thrown = null; - } - throw e; + super.check(resetToNull); } } - - synchronized void throwException4Close() throws IOException { - check(false); - throw new ClosedChannelException(); - } } enum ErrorType { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java new file mode 100644 index 0000000..06bc5d2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; + +/** + * The exception last seen by the {@link DataStreamer} or + * {@link DFSOutputStream}. + */ +@InterfaceAudience.Private +class ExceptionLastSeen { + private IOException thrown; + + /** Get the last seen exception. */ + synchronized protected IOException get() { + return thrown; + } + + /** + * Set the last seen exception. + * @param t the exception. + */ + synchronized void set(Throwable t) { + assert t != null; + this.thrown = t instanceof IOException ? + (IOException) t : new IOException(t); + } + + /** Clear the last seen exception. */ + synchronized void clear() { + thrown = null; + } + + /** + * Check if there already is an exception. Throw the exception if exist. + * + * @param resetToNull set to true to reset exception to null after calling + * this function. + * @throws IOException on existing IOException. + */ + synchronized void check(boolean resetToNull) throws IOException { + if (thrown != null) { + final IOException e = thrown; + if (resetToNull) { + thrown = null; + } + throw e; + } + } + + synchronized void throwException4Close() throws IOException { + check(false); + throw new ClosedChannelException(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 57da439..e7fa278 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -319,6 +319,82 @@ public class TestDFSStripedOutputStreamWithFailure { } } + private void testCloseWithExceptionsInStreamer( + int numFailures, boolean shouldFail) throws Exception { + assertTrue(numFailures <= + ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + final Path dirFile = new Path(dir, "ecfile-" + numFailures); + try (FSDataOutputStream out = dfs.create(dirFile, true)) { + out.write("idempotent close".getBytes()); + + // Expect to raise IOE on the first close call, but any following + // close() should be no-op. + LambdaTestUtils.intercept(IOException.class, + out::close); + + assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream); + DFSStripedOutputStream stripedOut = + (DFSStripedOutputStream) out.getWrappedStream(); + for (int i = 0; i < numFailures; i++) { + // Only inject 1 stream failure. + stripedOut.getStripedDataStreamer(i).getLastException().set( + new IOException("injected failure") + ); + } + if (shouldFail) { + LambdaTestUtils.intercept(IOException.class, out::close); + } + + // Close multiple times. All the following close() should have no + // side-effect. + out.close(); + } + } + + // HDFS-12612 + @Test + public void testIdempotentCloseWithFailedStreams() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + try { + setup(conf); + // shutdown few datanodes to avoid getting sufficient data blocks number + // of datanodes. + while (cluster.getDataNodes().size() >= dataBlocks) { + cluster.stopDataNode(0); + } + cluster.restartNameNodes(); + cluster.triggerHeartbeats(); + + testCloseWithExceptionsInStreamer(1, false); + testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits(), false); + testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits() + 1, true); + testCloseWithExceptionsInStreamer(ecPolicy.getNumDataUnits(), true); + } finally { + tearDown(); + } + } + + @Test + public void testCloseAfterAbort() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + try { + setup(conf); + + final Path dirFile = new Path(dir, "ecfile"); + FSDataOutputStream out = dfs.create(dirFile, true); + assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream); + DFSStripedOutputStream stripedOut = + (DFSStripedOutputStream) out.getWrappedStream(); + stripedOut.abort(); + LambdaTestUtils.intercept(IOException.class, + "Lease timeout", stripedOut::close); + } finally { + tearDown(); + } + } + @Test(timeout = 90000) public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org