hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [04/50] hadoop git commit: HDFS-10312. Large block reports may fail to decode at NameNode due to 64 MB protobuf maximum length restriction. Contributed by Chris Nauroth.
Date Tue, 03 May 2016 22:05:51 GMT
HDFS-10312. Large block reports may fail to decode at NameNode due to 64 MB protobuf maximum
length restriction. Contributed by Chris Nauroth.

(cherry picked from commit 63ac2db59af2b50e74dc892cae1dbc4d2e061423)
(cherry picked from commit cc56bed6867f844810f5fda0cba3cbc9a9367269)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7fb1cfc5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7fb1cfc5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7fb1cfc5

Branch: refs/heads/branch-2.8
Commit: 7fb1cfc591aac772d83ef84fab4bd257d9893276
Parents: e2968f5
Author: Chris Nauroth <cnauroth@apache.org>
Authored: Wed Apr 20 13:55:03 2016 -0700
Committer: Chris Nauroth <cnauroth@apache.org>
Committed: Wed Apr 20 14:00:06 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/BlockListAsLongs.java  |  93 +++++++++---
 .../DatanodeProtocolServerSideTranslatorPB.java |  11 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java |   9 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   7 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   8 +-
 .../server/datanode/TestLargeBlockReport.java   | 142 +++++++++++++++++++
 .../fsdataset/impl/FsDatasetImplTestUtils.java  |  11 ++
 7 files changed, 256 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 834546b..26c7ffb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
@@ -63,26 +66,42 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
   };
 
   /**
-   * Prepare an instance to in-place decode the given ByteString buffer
+   * Prepare an instance to in-place decode the given ByteString buffer.
    * @param numBlocks - blocks in the buffer
    * @param blocksBuf - ByteString encoded varints
+   * @param maxDataLength - maximum allowable data size in protobuf message
    * @return BlockListAsLongs
    */
   public static BlockListAsLongs decodeBuffer(final int numBlocks,
-      final ByteString blocksBuf) {
-    return new BufferDecoder(numBlocks, blocksBuf);
+      final ByteString blocksBuf, final int maxDataLength) {
+    return new BufferDecoder(numBlocks, blocksBuf, maxDataLength);
   }
 
   /**
-   * Prepare an instance to in-place decode the given ByteString buffers
+   * Prepare an instance to in-place decode the given ByteString buffers.
    * @param numBlocks - blocks in the buffers
    * @param blocksBufs - list of ByteString encoded varints
    * @return BlockListAsLongs
    */
+  @VisibleForTesting
   public static BlockListAsLongs decodeBuffers(final int numBlocks,
       final List<ByteString> blocksBufs) {
+    return decodeBuffers(numBlocks, blocksBufs,
+        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  /**
+   * Prepare an instance to in-place decode the given ByteString buffers.
+   * @param numBlocks - blocks in the buffers
+   * @param blocksBufs - list of ByteString encoded varints
+   * @param maxDataLength - maximum allowable data size in protobuf message
+   * @return BlockListAsLongs
+   */
+  public static BlockListAsLongs decodeBuffers(final int numBlocks,
+      final List<ByteString> blocksBufs, final int maxDataLength) {
     // this doesn't actually copy the data
-    return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
+    return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs),
+        maxDataLength);
   }
 
   /**
@@ -93,7 +112,21 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
    * @return BlockListAsLongs
    */
   public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
-    return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
+    return decodeLongs(blocksList, IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  /**
+   * Prepare an instance to in-place decode the given list of Longs.  Note
+   * it's much more efficient to decode ByteString buffers and only exists
+   * for compatibility.
+   * @param blocksList - list of longs
+   * @param maxDataLength - maximum allowable data size in protobuf message
+   * @return BlockListAsLongs
+   */
+  public static BlockListAsLongs decodeLongs(List<Long> blocksList,
+      int maxDataLength) {
+    return blocksList.isEmpty() ? EMPTY :
+        new LongsDecoder(blocksList, maxDataLength);
   }
 
   /**
@@ -102,17 +135,22 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
    * @param replicas - replicas to encode
    * @return BlockListAsLongs
    */
+  @VisibleForTesting
   public static BlockListAsLongs encode(
       final Collection<? extends Replica> replicas) {
-    BlockListAsLongs.Builder builder = builder();
+    BlockListAsLongs.Builder builder = builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     for (Replica replica : replicas) {
       builder.add(replica);
     }
     return builder.build();
   }
 
-  public static BlockListAsLongs readFrom(InputStream is) throws IOException {
+  public static BlockListAsLongs readFrom(InputStream is, int maxDataLength)
+      throws IOException {
     CodedInputStream cis = CodedInputStream.newInstance(is);
+    if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
+      cis.setSizeLimit(maxDataLength);
+    }
     int numBlocks = -1;
     ByteString blocksBuf = null;
     while (!cis.isAtEnd()) {
@@ -133,7 +171,7 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
       }
     }
     if (numBlocks != -1 && blocksBuf != null) {
-      return decodeBuffer(numBlocks, blocksBuf);
+      return decodeBuffer(numBlocks, blocksBuf, maxDataLength);
     }
     return null;
   }
@@ -144,9 +182,14 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     cos.writeBytes(2, getBlocksBuffer());
     cos.flush();
   }
-  
+
+  @VisibleForTesting
   public static Builder builder() {
-    return new BlockListAsLongs.Builder();
+    return builder(IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+  }
+
+  public static Builder builder(int maxDataLength) {
+    return new BlockListAsLongs.Builder(maxDataLength);
   }
 
   /**
@@ -221,10 +264,12 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final CodedOutputStream cos;
     private int numBlocks = 0;
     private int numFinalized = 0;
+    private final int maxDataLength;
 
-    Builder() {
+    Builder(int maxDataLength) {
       out = ByteString.newOutput(64*1024);
       cos = CodedOutputStream.newInstance(out);
+      this.maxDataLength = maxDataLength;
     }
 
     public void add(Replica replica) {
@@ -258,7 +303,8 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
         // shouldn't happen, ByteString.Output doesn't throw IOE
         throw new IllegalStateException(ioe);
       }
-      return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
+      return new BufferDecoder(numBlocks, numFinalized, out.toByteString(),
+          maxDataLength);
     }
   }
 
@@ -273,16 +319,19 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final ByteString buffer;
     private final int numBlocks;
     private int numFinalized;
+    private final int maxDataLength;
 
-    BufferDecoder(final int numBlocks, final ByteString buf) {
-      this(numBlocks, -1, buf);
+    BufferDecoder(final int numBlocks, final ByteString buf,
+        final int maxDataLength) {
+      this(numBlocks, -1, buf, maxDataLength);
     }
 
     BufferDecoder(final int numBlocks, final int numFinalized,
-        final ByteString buf) {
+        final ByteString buf, final int maxDataLength) {
       this.numBlocks = numBlocks;
       this.numFinalized = numFinalized;
       this.buffer = buf;
+      this.maxDataLength = maxDataLength;
     }
 
     @Override
@@ -349,6 +398,12 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
         final CodedInputStream cis = buffer.newCodedInput();
         private int currentBlockIndex = 0;
 
+        {
+          if (maxDataLength != IPC_MAXIMUM_DATA_LENGTH_DEFAULT) {
+            cis.setSizeLimit(maxDataLength);
+          }
+        }
+
         @Override
         public boolean hasNext() {
           return currentBlockIndex < numBlocks;
@@ -384,12 +439,14 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
     private final List<Long> values;
     private final int finalizedBlocks;
     private final int numBlocks;
+    private final int maxDataLength;
 
     // set the header
-    LongsDecoder(List<Long> values) {
+    LongsDecoder(List<Long> values, int maxDataLength) {
       this.values = values.subList(2, values.size());
       this.finalizedBlocks = values.get(0).intValue();
       this.numBlocks = finalizedBlocks + values.get(1).intValue();
+      this.maxDataLength = maxDataLength;
     }
 
     @Override
@@ -399,7 +456,7 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica>
{
 
     @Override
     public ByteString getBlocksBuffer() {
-      Builder builder = builder();
+      Builder builder = builder(maxDataLength);
       for (Replica replica : this) {
         builder.add(replica);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 4f8f44f..8791136 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -68,6 +68,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
     DatanodeProtocolPB {
 
   private final DatanodeProtocol impl;
+  private final int maxDataLength;
+
   private static final ErrorReportResponseProto
       VOID_ERROR_REPORT_RESPONSE_PROTO = 
           ErrorReportResponseProto.newBuilder().build();
@@ -81,8 +83,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       VOID_COMMIT_BLOCK_SYNCHRONIZATION_RESPONSE_PROTO =
           CommitBlockSynchronizationResponseProto.newBuilder().build();
 
-  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl) {
+  public DatanodeProtocolServerSideTranslatorPB(DatanodeProtocol impl,
+      int maxDataLength) {
     this.impl = impl;
+    this.maxDataLength = maxDataLength;
   }
 
   @Override
@@ -162,9 +166,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
         int num = (int)s.getNumberOfBlocks();
         Preconditions.checkState(s.getBlocksCount() == 0,
             "cannot send both blocks list and buffers");
-        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
+        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList(),
+            maxDataLength);
       } else {
-        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
+        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList(), maxDataLength);
       }
       report[index++] = new StorageBlockReport(PBHelperClient.convert(s.getStorage()),
           blocks);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 6c15eec..f6ad035 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.GetSpaceUsed;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -88,6 +89,7 @@ class BlockPoolSlice {
   private static final String REPLICA_CACHE_FILE = "replicas";
   private final long replicaCacheExpiry = 5*60*1000;
   private AtomicLong numOfBlocks = new AtomicLong();
+  private final int maxDataLength;
 
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final GetSpaceUsed dfsUsage;
@@ -120,6 +122,10 @@ class BlockPoolSlice {
         DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
         DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
 
+    this.maxDataLength = conf.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
     // in the future, we might want to do some sort of datanode-local
@@ -756,7 +762,8 @@ class BlockPoolSlice {
     FileInputStream inputStream = null;
     try {
       inputStream = new FileInputStream(replicaFile);
-      BlockListAsLongs blocksList =  BlockListAsLongs.readFrom(inputStream);
+      BlockListAsLongs blocksList =
+          BlockListAsLongs.readFrom(inputStream, maxDataLength);
       Iterator<BlockReportReplica> iterator = blocksList.iterator();
       while (iterator.hasNext()) {
         BlockReportReplica replica = iterator.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 64c1f0b..0693520 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -52,6 +52,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -258,6 +259,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   final LocalFileSystem localFS;
 
   private boolean blockPinningEnabled;
+  private final int maxDataLength;
   
   /**
    * An FSDataset has a directory where it loads its data files.
@@ -336,6 +338,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     blockPinningEnabled = conf.getBoolean(
       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
       DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
+    maxDataLength = conf.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
   }
 
   /**
@@ -1731,7 +1736,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
     List<FsVolumeImpl> curVolumes = volumes.getVolumes();
     for (FsVolumeSpi v : curVolumes) {
-      builders.put(v.getStorageID(), BlockListAsLongs.builder());
+      builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
     }
 
     synchronized(this) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 5f82492..ed34817 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
@@ -249,9 +251,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
          new ClientNamenodeProtocolServerSideTranslatorPB(this);
      BlockingService clientNNPbService = ClientNamenodeProtocol.
          newReflectiveBlockingService(clientProtocolServerTranslator);
-    
+
+    int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
+        IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = 
-        new DatanodeProtocolServerSideTranslatorPB(this);
+        new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength);
     BlockingService dnProtoPbService = DatanodeProtocolService
         .newReflectiveBlockingService(dnProtoPbTranslator);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
new file mode 100644
index 0000000..0adc7c5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestLargeBlockReport.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.datanode.BPOfferService;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.log4j.Level;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that very large block reports can pass through the RPC server and
+ * deserialization layers successfully if configured.
+ */
+public class TestLargeBlockReport {
+
+  private final HdfsConfiguration conf = new HdfsConfiguration();
+  private MiniDFSCluster cluster;
+  private DataNode dn;
+  private BPOfferService bpos;
+  private DatanodeProtocolClientSideTranslatorPB nnProxy;
+  private DatanodeRegistration bpRegistration;
+  private String bpId;
+  private DatanodeStorage dnStorage;
+  private final long reportId = 1;
+  private final long fullBrLeaseId = 0;
+
+  @BeforeClass
+  public static void init() {
+    DFSTestUtil.setNameNodeLogLevel(Level.WARN);
+    FsDatasetImplTestUtils.setFsDatasetImplLogLevel(Level.WARN);
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testBlockReportExceedsLengthLimit() throws Exception {
+    initCluster();
+    // Create a large enough report that we expect it will go beyond the RPC
+    // server's length validation, and also protobuf length validation.
+    StorageBlockReport[] reports = createReports(6000000);
+    try {
+      nnProxy.blockReport(bpRegistration, bpId, reports,
+          new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+      fail("Should have failed because of the too long RPC data length");
+    } catch (Exception e) {
+      // Expected.  We can't reliably assert anything about the exception type
+      // or the message.  The NameNode just disconnects, and the details are
+      // buried in the NameNode log.
+    }
+  }
+
+  @Test
+  public void testBlockReportSucceedsWithLargerLengthLimit() throws Exception {
+    conf.setInt(IPC_MAXIMUM_DATA_LENGTH, 128 * 1024 * 1024); // 128 MB
+    initCluster();
+    StorageBlockReport[] reports = createReports(6000000);
+    nnProxy.blockReport(bpRegistration, bpId, reports,
+        new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+  }
+
+  /**
+   * Creates storage block reports, consisting of a single report with the
+   * requested number of blocks.  The block data is fake, because the tests just
+   * need to validate that the messages can pass correctly.  This intentionally
+   * uses the old-style decoding method as a helper.  The test needs to cover
+   * the new-style encoding technique.  Passing through that code path here
+   * would trigger an exception before the test is ready to deal with it.
+   *
+   * @param numBlocks requested number of blocks
+   * @return storage block reports
+   */
+  private StorageBlockReport[] createReports(int numBlocks) {
+    int longsPerBlock = 3;
+    int blockListSize = 2 + numBlocks * longsPerBlock;
+    List<Long> longs = new ArrayList<Long>(blockListSize);
+    longs.add(Long.valueOf(numBlocks));
+    longs.add(0L);
+    for (int i = 0; i < blockListSize; ++i) {
+      longs.add(Long.valueOf(i));
+    }
+    BlockListAsLongs blockList = BlockListAsLongs.decodeLongs(longs);
+    StorageBlockReport[] reports = new StorageBlockReport[] {
+        new StorageBlockReport(dnStorage, blockList) };
+    return reports;
+  }
+
+  /**
+   * Start a mini-cluster, and set up everything the tests need to use it.
+   *
+   * @throws Exception if initialization fails
+   */
+  private void initCluster() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    dn = cluster.getDataNodes().get(0);
+    bpos = dn.getAllBpOs().get(0);
+    nnProxy = bpos.getActiveNN();
+    bpRegistration = bpos.bpRegistration;
+    bpId = bpos.getBlockPoolId();
+    dnStorage = dn.getFSDataset().getBlockReports(bpId).keySet().iterator()
+        .next();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7fb1cfc5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index 320ae9f..f16ca95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -377,4 +379,13 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
     Files.move(metaFile.toPath(), newMetaFile.toPath(),
         StandardCopyOption.ATOMIC_MOVE);
   }
+
+  /**
+   * Change the log level used by FsDatasetImpl.
+   *
+   * @param level the level to set
+   */
+  public static void setFsDatasetImplLogLevel(Level level) {
+    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message