hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [1/2] hadoop git commit: Revert "HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Xiaobing Zhou."
Date Tue, 11 Apr 2017 18:46:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 ac76dc10d -> 9f4585c95


Revert "HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Xiaobing
Zhou."

This reverts commit ac76dc10dde45ed00885bb96e89ed78015435790.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2d1d3d1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2d1d3d1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2d1d3d1

Branch: refs/heads/branch-2.7
Commit: f2d1d3d159c81056a93facad750b3738914f24c3
Parents: ac76dc1
Author: Arpit Agarwal <arp@apache.org>
Authored: Tue Apr 11 11:42:57 2017 -0700
Committer: Arpit Agarwal <arp@apache.org>
Committed: Tue Apr 11 11:42:57 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  71 ++---------
 .../protocol/datatransfer/PacketReceiver.java   |   2 +-
 .../apache/hadoop/hdfs/TestDFSOutputStream.java | 126 -------------------
 3 files changed, 13 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 5de5460..ef8aa5a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -72,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -102,8 +101,6 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceInfo;
 import org.apache.htrace.TraceScope;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -140,8 +137,6 @@ import com.google.common.cache.RemovalNotification;
 public class DFSOutputStream extends FSOutputSummer
     implements Syncable, CanSetDropBehind {
   private final long dfsclientSlowLogThresholdMs;
-  static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
-
   /**
    * Number of times to retry creating a file when there are transient 
    * errors (typically related to encryption zones and KeyProvider operations).
@@ -191,7 +186,6 @@ public class DFSOutputStream extends FSOutputSummer
   private FileEncryptionInfo fileEncryptionInfo;
   private static final BlockStoragePolicySuite blockStoragePolicySuite =
       BlockStoragePolicySuite.createDefaultSuite();
-  private int writePacketSize;
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
   private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
@@ -1675,9 +1669,7 @@ public class DFSOutputStream extends FSOutputSummer
       DFSClient.LOG.debug(
           "Set non-null progress callback on DFSOutputStream " + src);
     }
-
-    initWritePacketSize();
-
+    
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     if (bytesPerChecksum <= 0) {
       throw new HadoopIllegalArgumentException(
@@ -1695,21 +1687,6 @@ public class DFSOutputStream extends FSOutputSummer
     this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
   }
 
-  /**
-   * Ensures the configured writePacketSize never exceeds
-   * PacketReceiver.MAX_PACKET_SIZE.
-   */
-  private void initWritePacketSize() {
-    writePacketSize = dfsClient.getConf().writePacketSize;
-    if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
-      LOG.warn(
-          "Configured write packet exceeds {} bytes as max,"
-              + " using {} bytes.",
-          PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
-      writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
-    }
-  }
-
   /** Construct a new output stream for creating a file. */
   private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
@@ -1959,8 +1936,18 @@ public class DFSOutputStream extends FSOutputSummer
       }
       waitAndQueueCurrentPacket();
 
-      adjustChunkBoundary();
+      // If the reopened file did not end at chunk boundary and the above
+      // write filled up its partial chunk. Tell the summer to generate full 
+      // crc chunks from now on.
+      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
+        appendChunk = false;
+        resetChecksumBufSize();
+      }
 
+      if (!appendChunk) {
+        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
+        computePacketChunkSize(psize, bytesPerChecksum);
+      }
       //
       // if encountering a block boundary, send an empty packet to 
       // indicate the end of block and reset bytesCurBlock.
@@ -1975,40 +1962,6 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  /**
-   * If the reopened file did not end at chunk boundary and the above
-   * write filled up its partial chunk. Tell the summer to generate full
-   * crc chunks from now on.
-   */
-  protected void adjustChunkBoundary() {
-    if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {
-      appendChunk = false;
-      resetChecksumBufSize();
-    }
-
-    if (!appendChunk) {
-      final int psize = (int) Math.min(blockSize - bytesCurBlock,
-          writePacketSize);
-      computePacketChunkSize(psize, bytesPerChecksum);
-    }
-  }
-
-  /**
-   * Used in test only.
-   */
-  @VisibleForTesting
-  void setAppendChunk(final boolean appendChunk) {
-    this.appendChunk = appendChunk;
-  }
-
-  /**
-   * Used in test only.
-   */
-  @VisibleForTesting
-  void setBytesCurBlock(final long bytesCurBlock) {
-    this.bytesCurBlock = bytesCurBlock;
-  }
-
   @Deprecated
   public void sync() throws IOException {
     hflush();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
index 784c305..3045a13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
@@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
    * The max size of any single packet. This prevents OOMEs when
    * invalid data is sent.
    */
-  public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+  private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
 
   static final Log LOG = LogFactory.getLog(PacketReceiver.class);
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d1d3d1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 8c46564..7269e39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -17,28 +17,20 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.PathUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
-
 public class TestDFSOutputStream {
   static MiniDFSCluster cluster;
 
@@ -105,124 +97,6 @@ public class TestDFSOutputStream {
     Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
   }
 
-  /**
-   * This tests preventing overflows of package size and bodySize.
-   * <p>
-   * See also https://issues.apache.org/jira/browse/HDFS-11608.
-   * </p>
-   * @throws IOException
-   * @throws SecurityException
-   * @throws NoSuchFieldException
-   * @throws InvocationTargetException
-   * @throws IllegalArgumentException
-   * @throws IllegalAccessException
-   * @throws NoSuchMethodException
-   */
-  @Test(timeout=60000)
-  public void testPreventOverflow() throws IOException, NoSuchFieldException,
-      SecurityException, IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException {
-
-    final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
-    int configuredWritePacketSize = defaultWritePacketSize;
-    int finalWritePacketSize = defaultWritePacketSize;
-
-    /* test default WritePacketSize, e.g. 64*1024 */
-    runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
-
-    /* test large WritePacketSize, e.g. 1G */
-    configuredWritePacketSize = 1000 * 1024 * 1024;
-    finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
-    runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
-  }
-
-  /**
-   * @configuredWritePacketSize the configured WritePacketSize.
-   * @finalWritePacketSize the final WritePacketSize picked by
-   *                       {@link DFSOutputStream#adjustChunkBoundary}
-   */
-  private void runAdjustChunkBoundary(
-      final int configuredWritePacketSize,
-      final int finalWritePacketSize) throws IOException, NoSuchFieldException,
-      SecurityException, IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException {
-
-    final boolean appendChunk = false;
-    final long blockSize = 3221225500L;
-    final long bytesCurBlock = 1073741824L;
-    final int bytesPerChecksum = 512;
-    final int checksumSize = 4;
-    final int chunkSize = bytesPerChecksum + checksumSize;
-    final int packateMaxHeaderLength = 33;
-
-    MiniDFSCluster dfsCluster = null;
-    final File baseDir = new File(PathUtils.getTestDir(getClass()),
-        GenericTestUtils.getMethodName());
-
-    try {
-      final Configuration dfsConf = new Configuration();
-      dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
-          baseDir.getAbsolutePath());
-      dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
-          configuredWritePacketSize);
-      dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
-      dfsCluster.waitActive();
-
-      final FSDataOutputStream os = dfsCluster.getFileSystem()
-          .create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
-      final DFSOutputStream dos = (DFSOutputStream) Whitebox
-          .getInternalState(os, "wrappedStream");
-
-      /* set appendChunk */
-      final Method setAppendChunkMethod = dos.getClass()
-          .getDeclaredMethod("setAppendChunk", boolean.class);
-      setAppendChunkMethod.setAccessible(true);
-      setAppendChunkMethod.invoke(dos, appendChunk);
-
-      /* set bytesCurBlock */
-      final Method setBytesCurBlockMethod = dos.getClass()
-          .getDeclaredMethod("setBytesCurBlock", long.class);
-      setBytesCurBlockMethod.setAccessible(true);
-      setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
-
-      /* set blockSize */
-      final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
-      blockSizeField.setAccessible(true);
-      blockSizeField.setLong(dos, blockSize);
-
-      /* call adjustChunkBoundary */
-      final Method method = dos.getClass()
-          .getDeclaredMethod("adjustChunkBoundary");
-      method.setAccessible(true);
-      method.invoke(dos);
-
-      /* get and verify writePacketSize */
-      final Field writePacketSizeField = dos.getClass()
-          .getDeclaredField("writePacketSize");
-      writePacketSizeField.setAccessible(true);
-      Assert.assertEquals(writePacketSizeField.getInt(dos),
-          finalWritePacketSize);
-
-      /* get and verify chunksPerPacket */
-      final Field chunksPerPacketField = dos.getClass()
-          .getDeclaredField("chunksPerPacket");
-      chunksPerPacketField.setAccessible(true);
-      Assert.assertEquals(chunksPerPacketField.getInt(dos),
-          (finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
-
-      /* get and verify packetSize */
-      final Field packetSizeField = dos.getClass()
-          .getDeclaredField("packetSize");
-      packetSizeField.setAccessible(true);
-      Assert.assertEquals(packetSizeField.getInt(dos),
-          chunksPerPacketField.getInt(dos) * chunkSize);
-    } finally {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-      }
-    }
-  }
-
   @AfterClass
   public static void tearDown() {
     cluster.shutdown();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message