hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject [2/3] hadoop git commit: HDFS-10312. Large block reports may fail to decode at NameNode due to 64 MB protobuf maximum length restriction. Contributed by Chris Nauroth.
Date Wed, 20 Apr 2016 21:02:04 GMT
HDFS-10312. Large block reports may fail to decode at NameNode due to 64 MB protobuf maximum
length restriction. Contributed by Chris Nauroth.

(cherry picked from commit 63ac2db59af2b50e74dc892cae1dbc4d2e061423)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc56bed6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc56bed6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc56bed6

Branch: refs/heads/branch-2
Commit: cc56bed6867f844810f5fda0cba3cbc9a9367269
Parents: 76d963a
Author: Chris Nauroth <cnauroth@apache.org>
Authored: Wed Apr 20 13:55:03 2016 -0700
Committer: Chris Nauroth <cnauroth@apache.org>
Committed: Wed Apr 20 13:55:03 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/BlockListAsLongs.java  |  93 +++++++++---
 .../DatanodeProtocolServerSideTranslatorPB.java |  11 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  10 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   7 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   8 +-
 .../server/datanode/TestLargeBlockReport.java   | 142 +++++++++++++++++++
 .../fsdataset/impl/FsDatasetImplTestUtils.java  |  11 ++
 7 files changed, 257 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 834546b..26c7ffb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
@@ -63,26 +66,42 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
   };
 
   /**
-   * Prepare an instance to in-place decode the given ByteString buffer
+   * Prepare an instance to in-place decode the given ByteString buffer.
    * @param numBlocks - blocks in the buffer
    * @param blocksBuf - ByteString encoded varints
+   * @param maxDataLength - maximum allowable data size in protobuf message
    * @return BlockListAsLongs
    */
   public static BlockListAsLongs decodeBuffer(final int numBlocks,
-      final ByteString blocksBuf) {
-    return new BufferDecoder(numBlocks, blocksBuf);
+      final ByteString blocksBuf, final int maxDataLength) {
+    return new BufferDecoder(numBlocks, blocksBuf, maxDataLength);
   }
 
   /**
-   * Prepare an instance to in-place decode the given ByteString buffers
+   * Prepare an instance to in-place decode the given ByteString buffers.
    * @param numBlocks - blocks in the buffers
    * @param blocksBufs - list of ByteString encoded varints
    * @return BlockListAsLongs
    */
+  @VisibleForTesting
   public static BlockListAsLongs decodeBuffers(final int numBlocks,
       final List<ByteString> blocksBufs) {
+    return decodeBuffers(numBlocks, blocksBufs,
+        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  /**
+   * Prepare an instance to in-place decode the given ByteString buffers.
+   * @param numBlocks - blocks in the buffers
+   * @param blocksBufs - list of ByteString encoded varints
+   * @param maxDataLength - maximum allowable data size in protobuf message
+   * @return BlockListAsLongs
+   */
+  public static BlockListAsLongs decodeBuffers(final int numBlocks,
+      final List<ByteString> blocksBufs, final int maxDataLength) {
     // this doesn't actually copy the data
-    return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
+    return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs),
+        maxDataLength);
   }
 
   /**
@@ -93,7 +112,21 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
    * @return BlockListAsLongs
    */
   public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
-    return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
+    return decodeLongs(blocksList, IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  /**
+   * Prepare an instance to in-place decode the given list of Longs.  Note
+   * it's much more efficient to decode ByteString buffers and only exists
+   * for compatibility.
+   * @param blocksList - list of longs
+   * @param maxDataLength - maximum allowable data size in protobuf message
+   * @return BlockListAsLongs
+   */
+  public static BlockListAsLongs decodeLongs(List<Long> blocksList,
+      int maxDataLength) {
+    return blocksList.isEmpty() ? EMPTY :
+        new LongsDecoder(blocksList, maxDataLength);
   }
 
   /**
@@ -102,17 +135,22 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
    * @param replicas - replicas to encode
    * @return BlockListAsLongs
    */
+  @VisibleForTesting
   public static BlockListAsLongs encode(
       final Collection<? extends Replica> replicas) {
-    BlockListAsLongs.Builder builder = builder();
+    BlockListAsLongs.Builder builder = builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     for (Replica replica : replicas) {
       builder.add(replica);
     }
     return builder.build();
   }
 
-  public static BlockListAsLongs readFrom(InputStream is) throws IOException {
+  public static BlockListAsLongs readFrom(InputStream is, int maxDataLength)
+      throws IOException {
     CodedInputStream cis = CodedInputStream.newInstance(is);
+    if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
+      cis.setSizeLimit(maxDataLength);
+    }
     int numBlocks = -1;
     ByteString blocksBuf = null;
     while (!cis.isAtEnd()) {
@@ -133,7 +171,7 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
       }
     }
     if (numBlocks != -1 && blocksBuf != null) {
-      return decodeBuffer(numBlocks, blocksBuf);
+      return decodeBuffer(numBlocks, blocksBuf, maxDataLength);
     }
     return null;
   }
@@ -144,9 +182,14 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     cos.writeBytes(2, getBlocksBuffer());
     cos.flush();
   }
-  
+
+  @VisibleForTesting
   public static Builder builder() {
-    return new BlockListAsLongs.Builder();
+    return builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  public static Builder builder(int maxDataLength) {
+    return new BlockListAsLongs.Builder(maxDataLength);
   }
 
   /**
@@ -221,10 +264,12 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final CodedOutputStream cos;
     private int numBlocks = 0;
     private int numFinalized = 0;
+    private final int maxDataLength;
 
-    Builder() {
+    Builder(int maxDataLength) {
       out = ByteString.newOutput(64*1024);
       cos = CodedOutputStream.newInstance(out);
+      this.maxDataLength = maxDataLength;
     }
 
     public void add(Replica replica) {
@@ -258,7 +303,8 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
         // shouldn't happen, ByteString.Output doesn't throw IOE
         throw new IllegalStateException(ioe);
       }
-      return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
+      return new BufferDecoder(numBlocks, numFinalized, out.toByteString(),
+          maxDataLength);
     }
   }
 
@@ -273,16 +319,19 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final ByteString buffer;
     private final int numBlocks;
     private int numFinalized;
+    private final int maxDataLength;
 
-    BufferDecoder(final int numBlocks, final ByteString buf) {
-      this(numBlocks, -1, buf);
+    BufferDecoder(final int numBlocks, final ByteString buf,
+        final int maxDataLength) {
+      this(numBlocks, -1, buf, maxDataLength);
     }
 
     BufferDecoder(final int numBlocks, final int numFinalized,
-        final ByteString buf) {
+        final ByteString buf, final int maxDataLength) {
       this.numBlocks = numBlocks;
       this.numFinalized = numFinalized;
       this.buffer = buf;
+      this.maxDataLength = maxDataLength;
     }
 
     @Override
@@ -349,6 +398,12 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
         final CodedInputStream cis = buffer.newCodedInput();
         private int currentBlockIndex = 0;
 
+        {
+          if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
+            cis.setSizeLimit(maxDataLength);
+          }
+        }
+
         @Override
         public boolean hasNext() {
           return currentBlockIndex < numBlocks;
@@ -384,12 +439,14 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final List<Long> values;
     private final int finalizedBlocks;
     private final int numBlocks;
+    private final int maxDataLength;
 
     // set the header
-    LongsDecoder(List<Long> values) {
+    LongsDecoder(List<Long> values, int maxDataLength) {
       this.values = values.subList(2, values.size());
       this.finalizedBlocks = values.get(0).intValue();
       this.numBlocks = finalizedBlocks + values.get(1).intValue();
+      this.maxDataLength = maxDataLength;
     }
 
     @Override
@@ -399,7 +456,7 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
 
     @Override
     public ByteString getBlocksBuffer() {
-      Builder builder = builder();
+      Builder builder = builder(maxDataLength);
       for (Replica replica : this) {
         builder.add(replica);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 4f8f44f..8791136 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -68,6 +68,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
     DatanodeProtocolPB {
 
   private final DatanodeProtocol impl;
+  private final int maxDataLength;
+
   private static final ErrorReportResponseProto
       VOID_ERROR_REPORT_RESPONSE_PROTO = 
           ErrorReportResponseProto.newBuilder().build();
@@ -81,8 +83,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
           CommitBlockSynchronizationResponseProto.newBuilder().build();
 
-  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
+  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl,
+      int maxDataLength) {
     this.impl = impl;
+    this.maxDataLength = maxDataLength;
   }
 
   @Override
@@ -162,9 +166,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
         int num = (int)s.getNumberOfBlocks();
         Preconditions.checkState(s.getBlocksCount() == 0,
             "cannot send both blocks list and buffers");
-        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
+        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList(),
+            maxDataLength);
       } else {
-        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
+        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList(), maxDataLength);
       }
       report[index++] = new StorageBlockReport(PBHelperClient.convert(s.getStorage()),
           blocks);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 57804d1..3324db9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -91,6 +92,7 @@ class BlockPoolSlice {
   private AtomicLong numOfBlocks = new AtomicLong();
   private final long cachedDfsUsedCheckTime;
   private final Timer timer;
+  private final int maxDataLength;
 
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final GetSpaceUsed dfsUsage;
@@ -128,6 +130,11 @@ class BlockPoolSlice {
         conf.getLong(
             DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
             DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
+
+    this.maxDataLength = conf.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+
     this.timer = timer;
 
     // Files that were being written when the datanode was last shutdown
@@ -766,7 +773,8 @@ class BlockPoolSlice {
     FileInputStream inputStream = null;
     try {
       inputStream = new FileInputStream(replicaFile);
-      BlockListAsLongs blocksList =  BlockListAsLongs.readFrom(inputStream);
+      BlockListAsLongs blocksList =
+          BlockListAsLongs.readFrom(inputStream, maxDataLength);
       Iterator<BlockReportReplica> iterator = blocksList.iterator();
       while (iterator.hasNext()) {
         BlockReportReplica replica = iterator.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 9b4a074..c692a58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -52,6 +52,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -260,6 +261,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   final LocalFileSystem localFS;
 
   private boolean blockPinningEnabled;
+  private final int maxDataLength;
   
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -338,6 +340,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     blockPinningEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
+    maxDataLength = conf.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
   }
 
   /**
@@ -1733,7 +1738,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     List<FsVolumeImpl> curVolumes = volumes.getVolumes();
     for (FsVolumeSpi v : curVolumes) {
-      builders.put(v.getStorageID(), BlockListAsLongs.builder());
+      builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
     }
 
     synchronized(this) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 5fad160..4b7cecb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
@@ -253,9 +255,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
          new ClientNamenodeProtocolServerSideTranslatorPB(this);
      BlockingService clientNNPbService = ClientNamenodeProtocol.
          newReflectiveBlockingService(clientProtocolServerTranslator);
-    
+
+    int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
+        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = 
-        new DatanodeProtocolServerSideTranslatorPB(this);
+        new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
     BlockingService dnProtoPbService = DatanodeProtocolService
         .newReflectiveBlockingService(dnProtoPbTranslator);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
new file mode 100644
index 0000000..0adc7c5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that very large block reports can pass through the RPC server and
+ * deserialization layers successfully if configured.
+ */
+public class TestLargeBlockReport {
+
+  private final HdfsConfiguration conf = new HdfsConfiguration();
+  private MiniDFSCluster cluster;
+  private DataNode dn;
+  private BPOfferService bpos;
+  private DatanodeProtocolClientSideTranslatorPB nnProxy;
+  private DatanodeRegistration bpRegistration;
+  private String bpId;
+  private DatanodeStorage dnStorage;
+  private final long reportId = 1;
+  private final long fullBrLeaseId = 0;
+
+  @BeforeClass
+  public static void init() {
+    DFSTestUtil.setNameNodeLogLevel(Level.WARN);
+    FsDatasetImplTestUtils.setFsDatasetImplLogLevel(Level.WARN);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBlockReportExceedsLengthLimit() throws Exception {
+    initCluster();
+    // Create a large enough report that we expect it will go beyond the RPC
+    // server's length validation, and also protobuf length validation.
+    StorageBlockReport[] reports = createReports(6000000);
+    try {
+      nnProxy.blockReport(bpRegistration, bpId, reports,
+          new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+      fail("Should have failed because of the too long RPC data length");
+    } catch (Exception e) {
+      // Expected.  We can't reliably assert anything about the exception type
+      // or the message.  The NameNode just disconnects, and the details are
+      // buried in the NameNode log.
+    }
+  }
+
+  @Test
+  public void testBlockReportSucceedsWithLargerLengthLimit() throws Exception {
+    conf.setInt(IPC_MAXIMUM_DATA_LENGTH, 128 * 1024 * 1024); // 128 MB
+    initCluster();
+    StorageBlockReport[] reports = createReports(6000000);
+    nnProxy.blockReport(bpRegistration, bpId, reports,
+        new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+  }
+
+  /**
+   * Creates storage block reports, consisting of a single report with the
+   * requested number of blocks.  The block data is fake, because the tests just
+   * need to validate that the messages can pass correctly.  This intentionally
+   * uses the old-style decoding method as a helper.  The test needs to cover
+   * the new-style encoding technique.  Passing through that code path here
+   * would trigger an exception before the test is ready to deal with it.
+   *
+   * @param numBlocks requested number of blocks
+   * @return storage block reports
+   */
+  private StorageBlockReport[] createReports(int numBlocks) {
+    int longsPerBlock = 3;
+    int blockListSize = 2 + numBlocks * longsPerBlock;
+    List<Long> longs = new ArrayList<Long>(blockListSize);
+    longs.add(Long.valueOf(numBlocks));
+    longs.add(0L);
+    for (int i = 0; i < blockListSize; ++i) {
+      longs.add(Long.valueOf(i));
+    }
+    BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
+    StorageBlockReport[] reports = new StorageBlockReport[] {
+        new StorageBlockReport(dnStorage, blockList) };
+    return reports;
+  }
+
+  /**
+   * Start a mini-cluster, and set up everything the tests need to use it.
+   *
+   * @throws Exception if initialization fails
+   */
+  private void initCluster() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    dn = cluster.getDataNodes().get(0);
+    bpos = dn.getAllBpOs().get(0);
+    nnProxy = bpos.getActiveNN();
+    bpRegistration = bpos.bpRegistration;
+    bpId = bpos.getBlockPoolId();
+    dnStorage = dn.getFSDataset().getBlockReports(bpId).keySet().iterator()
+        .next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc56bed6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index f780a14..d2f3db0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -465,4 +467,13 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
           String.format("Block pool directory %s exists", bpDir));
     }
   }
+
+  /**
+   * Change the log level used by FsDatasetImpl.
+   *
+   * @param level the level to set
+   */
+  public static void setFsDatasetImplLogLevel(Level level) {
+    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
+  }
 }


Mime
View raw message