Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C089200C59 for ; Mon, 17 Apr 2017 17:15:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 48AE3160BB1; Mon, 17 Apr 2017 15:15:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4270F160BAE for ; Mon, 17 Apr 2017 17:15:28 +0200 (CEST) Received: (qmail 16374 invoked by uid 500); 17 Apr 2017 15:15:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 16153 invoked by uid 99); 17 Apr 2017 15:15:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Apr 2017 15:15:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 44252E00C4; Mon, 17 Apr 2017 15:15:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Mon, 17 Apr 2017 15:15:28 -0000 Message-Id: <9c69673a6c104c468f142a7acd01af57@git.apache.org> In-Reply-To: <688528025789441996988543d43ba921@git.apache.org> References: <688528025789441996988543d43ba921@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] hadoop git commit: HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou. archived-at: Mon, 17 Apr 2017 15:15:29 -0000 HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0eacd4c1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0eacd4c1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0eacd4c1 Branch: refs/heads/trunk Commit: 0eacd4c13be9bad0fbed9421a6539c64bbda4df1 Parents: a49fac5 Author: Xiaoyu Yao Authored: Thu Apr 6 16:11:55 2017 -0700 Committer: Xiaoyu Yao Committed: Thu Apr 6 16:11:55 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSOutputStream.java | 39 +++++- .../protocol/datatransfer/PacketReceiver.java | 2 +- .../apache/hadoop/hdfs/TestDFSOutputStream.java | 126 +++++++++++++++++++ 3 files changed, 164 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eacd4c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e4929e1..9a52fbe 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; @@ -122,6 +123,7 @@ public class DFSOutputStream extends FSOutputSummer private final EnumSet addBlockFlags; protected final AtomicReference cachingStrategy; private FileEncryptionInfo fileEncryptionInfo; + private int writePacketSize; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -202,6 +204,8 @@ public class DFSOutputStream extends FSOutputSummer +"{}", src); } + initWritePacketSize(); + this.bytesPerChecksum = checksum.getBytesPerChecksum(); if (bytesPerChecksum <= 0) { throw new HadoopIllegalArgumentException( @@ -216,6 +220,21 @@ public class DFSOutputStream extends FSOutputSummer this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } + /** + * Ensures the configured writePacketSize never exceeds + * PacketReceiver.MAX_PACKET_SIZE. + */ + private void initWritePacketSize() { + writePacketSize = dfsClient.getConf().getWritePacketSize(); + if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) { + LOG.warn( + "Configured write packet exceeds {} bytes as max," + + " using {} bytes.", + PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE); + writePacketSize = PacketReceiver.MAX_PACKET_SIZE; + } + } + /** Construct a new output stream for creating a file. */ protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, @@ -489,13 +508,29 @@ public class DFSOutputStream extends FSOutputSummer } if (!getStreamer().getAppendChunk()) { - int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), - dfsClient.getConf().getWritePacketSize()); + final int psize = (int) Math + .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } } /** + * Used in test only. + */ + @VisibleForTesting + void setAppendChunk(final boolean appendChunk) { + getStreamer().setAppendChunk(appendChunk); + } + + /** + * Used in test only. + */ + @VisibleForTesting + void setBytesCurBlock(final long bytesCurBlock) { + getStreamer().setBytesCurBlock(bytesCurBlock); + } + + /** * if encountering a block boundary, send an empty packet to * indicate the end of block and reset bytesCurBlock. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eacd4c1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java index 93e9217..6b717ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable { * The max size of any single packet. This prevents OOMEs when * invalid data is sent. */ - private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; + public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eacd4c1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 9ec01b6..52e3bb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.EnumSet; @@ -41,10 +43,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.apache.htrace.core.SpanId; import org.junit.AfterClass; import org.junit.Assert; @@ -64,6 +69,9 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; + public class TestDFSOutputStream { static MiniDFSCluster cluster; @@ -133,6 +141,124 @@ public class TestDFSOutputStream { Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize); } + /** + * This tests preventing overflows of package size and bodySize. + *

+ * See also https://issues.apache.org/jira/browse/HDFS-11608. + *

+ * @throws IOException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws InvocationTargetException + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws NoSuchMethodException + */ + @Test(timeout=60000) + public void testPreventOverflow() throws IOException, NoSuchFieldException, + SecurityException, IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { + + final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; + int configuredWritePacketSize = defaultWritePacketSize; + int finalWritePacketSize = defaultWritePacketSize; + + /* test default WritePacketSize, e.g. 64*1024 */ + runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); + + /* test large WritePacketSize, e.g. 1G */ + configuredWritePacketSize = 1000 * 1024 * 1024; + finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE; + runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); + } + + /** + * @configuredWritePacketSize the configured WritePacketSize. + * @finalWritePacketSize the final WritePacketSize picked by + * {@link DFSOutputStream#adjustChunkBoundary} + */ + private void runAdjustChunkBoundary( + final int configuredWritePacketSize, + final int finalWritePacketSize) throws IOException, NoSuchFieldException, + SecurityException, IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { + + final boolean appendChunk = false; + final long blockSize = 3221225500L; + final long bytesCurBlock = 1073741824L; + final int bytesPerChecksum = 512; + final int checksumSize = 4; + final int chunkSize = bytesPerChecksum + checksumSize; + final int packateMaxHeaderLength = 33; + + MiniDFSCluster dfsCluster = null; + final File baseDir = new File(PathUtils.getTestDir(getClass()), + GenericTestUtils.getMethodName()); + + try { + final Configuration dfsConf = new Configuration(); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + baseDir.getAbsolutePath()); + dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + configuredWritePacketSize); + dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build(); + dfsCluster.waitActive(); + + final FSDataOutputStream os = dfsCluster.getFileSystem() + .create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow")); + final DFSOutputStream dos = (DFSOutputStream) Whitebox + .getInternalState(os, "wrappedStream"); + + /* set appendChunk */ + final Method setAppendChunkMethod = dos.getClass() + .getDeclaredMethod("setAppendChunk", boolean.class); + setAppendChunkMethod.setAccessible(true); + setAppendChunkMethod.invoke(dos, appendChunk); + + /* set bytesCurBlock */ + final Method setBytesCurBlockMethod = dos.getClass() + .getDeclaredMethod("setBytesCurBlock", long.class); + setBytesCurBlockMethod.setAccessible(true); + setBytesCurBlockMethod.invoke(dos, bytesCurBlock); + + /* set blockSize */ + final Field blockSizeField = dos.getClass().getDeclaredField("blockSize"); + blockSizeField.setAccessible(true); + blockSizeField.setLong(dos, blockSize); + + /* call adjustChunkBoundary */ + final Method method = dos.getClass() + .getDeclaredMethod("adjustChunkBoundary"); + method.setAccessible(true); + method.invoke(dos); + + /* get and verify writePacketSize */ + final Field writePacketSizeField = dos.getClass() + .getDeclaredField("writePacketSize"); + writePacketSizeField.setAccessible(true); + Assert.assertEquals(writePacketSizeField.getInt(dos), + finalWritePacketSize); + + /* get and verify chunksPerPacket */ + final Field chunksPerPacketField = dos.getClass() + .getDeclaredField("chunksPerPacket"); + chunksPerPacketField.setAccessible(true); + Assert.assertEquals(chunksPerPacketField.getInt(dos), + (finalWritePacketSize - packateMaxHeaderLength) / chunkSize); + + /* get and verify packetSize */ + final Field packetSizeField = dos.getClass() + .getDeclaredField("packetSize"); + packetSizeField.setAccessible(true); + Assert.assertEquals(packetSizeField.getInt(dos), + chunksPerPacketField.getInt(dos) * chunkSize); + } finally { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + } + @Test public void testCongestionBackoff() throws IOException { DfsClientConf dfsClientConf = mock(DfsClientConf.class); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org