Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8279C1802D for ; Fri, 5 Feb 2016 15:22:09 +0000 (UTC) Received: (qmail 13993 invoked by uid 500); 5 Feb 2016 15:21:56 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 13926 invoked by uid 500); 5 Feb 2016 15:21:56 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 13917 invoked by uid 99); 5 Feb 2016 15:21:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Feb 2016 15:21:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 49F1AE0063; Fri, 5 Feb 2016 15:21:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: szetszwo@apache.org To: common-commits@hadoop.apache.org Message-Id: <865082e6c3a34c1290dbe11b39d8f393@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-9726. Refactor IBR code to a new class. Date: Fri, 5 Feb 2016 15:21:56 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 e62ca1485 -> 117397b89 HDFS-9726. Refactor IBR code to a new class. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/117397b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/117397b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/117397b8 Branch: refs/heads/branch-2 Commit: 117397b89f3f8f9a8c67a4958ad5b313e1f7aa37 Parents: e62ca14 Author: Tsz-Wo Nicholas Sze Authored: Fri Feb 5 23:17:59 2016 +0800 Committer: Tsz-Wo Nicholas Sze Committed: Fri Feb 5 23:20:16 2016 +0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/datanode/BPOfferService.java | 45 ++- .../hdfs/server/datanode/BPServiceActor.java | 271 ++----------------- .../datanode/IncrementalBlockReportManager.java | 224 +++++++++++++++ .../datanode/TestIncrementalBlockReports.java | 11 +- .../server/datanode/TestTriggerBlockReport.java | 11 +- 6 files changed, 285 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f1bbf08..e7563f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1038,6 +1038,8 @@ Release 2.8.0 - UNRELEASED HDFS-9715. Check storage ID uniqueness on datanode startup (Lei (Eddy) Xu via vinayakumarb) + HDFS-9726. Refactor IBR code to a new class. (szetszwo) + BUG FIXES HDFS-8091: ACLStatus and XAttributes should be presented to http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index f93ac88..37a10c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -233,14 +233,27 @@ class BPOfferService { */ void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) { + notifyNamenodeBlock(block, BlockStatus.RECEIVED_BLOCK, delHint, + storageUuid); + } + + void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) { + notifyNamenodeBlock(block, BlockStatus.RECEIVING_BLOCK, null, storageUuid); + } + + void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { + notifyNamenodeBlock(block, BlockStatus.DELETED_BLOCK, null, storageUuid); + } + + private void notifyNamenodeBlock(ExtendedBlock block, BlockStatus status, + String delHint, String storageUuid) { checkBlock(block); - ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( - block.getLocalBlock(), - ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, - delHint); + final ReceivedDeletedBlockInfo info = new ReceivedDeletedBlockInfo( + block.getLocalBlock(), status, delHint); + final DatanodeStorage storage = dn.getFSDataset().getStorage(storageUuid); for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeBlock(bInfo, storageUuid, true); + actor.getIbrManager().notifyNamenodeBlock(info, storage); } } @@ -251,26 +264,6 @@ class BPOfferService { "block belongs to BP %s instead of BP %s", block.getBlockPoolId(), getBlockPoolId()); } - - void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { - checkBlock(block); - ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( - block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null); - - for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeDeletedBlock(bInfo, storageUuid); - } - } - - void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) { - checkBlock(block); - ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( - block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null); - - for (BPServiceActor actor : bpServices) { - actor.notifyNamenodeBlock(bInfo, storageUuid, false); - } - } //This must be called only by blockPoolManager void start() { @@ -577,7 +570,7 @@ class BPOfferService { @VisibleForTesting void triggerDeletionReportForTests() throws IOException { for (BPServiceActor actor : bpServices) { - actor.triggerDeletionReportForTests(); + actor.getIbrManager().triggerDeletionReportForTests(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 1b72961..336b943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.base.Joiner; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -51,9 +50,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; -import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; @@ -61,10 +58,10 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; +import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import org.slf4j.Logger; +import com.google.common.base.Joiner; /** * A thread per active or standby namenode to perform: @@ -95,25 +92,14 @@ class BPServiceActor implements Runnable { } private volatile RunningState runningState = RunningState.CONNECTING; - - /** - * Between block reports (which happen on the order of once an hour) the - * DN reports smaller incremental changes to its block list. This map, - * keyed by block ID, contains the pending changes which have yet to be - * reported to the NN. Access should be synchronized on this object. - */ - private final Map - pendingIncrementalBRperStorage = Maps.newHashMap(); - - // IBR = Incremental Block Report. If this flag is set then an IBR will be - // sent immediately by the actor thread without waiting for the IBR timer - // to elapse. - private volatile boolean sendImmediateIBR = false; private volatile boolean shouldServiceRun = true; private final DataNode dn; private final DNConf dnConf; private long prevBlockReportId; + private final IncrementalBlockReportManager ibrManager + = new IncrementalBlockReportManager(); + private DatanodeRegistration bpRegistration; final LinkedList bpThreadQueue = new LinkedList(); @@ -131,6 +117,10 @@ class BPServiceActor implements Runnable { return bpRegistration; } + IncrementalBlockReportManager getIbrManager() { + return ibrManager; + } + boolean isAlive() { if (!shouldServiceRun || !bpThread.isAlive()) { return false; @@ -231,141 +221,20 @@ class BPServiceActor implements Runnable { register(nsInfo); } - /** - * Report received blocks and delete hints to the Namenode for each - * storage. - * - * @throws IOException - */ - private void reportReceivedDeletedBlocks() throws IOException { - - // Generate a list of the pending reports for each storage under the lock - ArrayList reports = - new ArrayList(pendingIncrementalBRperStorage.size()); - synchronized (pendingIncrementalBRperStorage) { - for (Map.Entry entry : - pendingIncrementalBRperStorage.entrySet()) { - final DatanodeStorage storage = entry.getKey(); - final PerStoragePendingIncrementalBR perStorageMap = entry.getValue(); - - if (perStorageMap.getBlockInfoCount() > 0) { - // Send newly-received and deleted blockids to namenode - ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos(); - reports.add(new StorageReceivedDeletedBlocks(storage, rdbi)); - } - } - sendImmediateIBR = false; - } - - if (reports.size() == 0) { - // Nothing new to report. - return; - } - - // Send incremental block reports to the Namenode outside the lock - boolean success = false; - final long startTime = monotonicNow(); - try { - bpNamenode.blockReceivedAndDeleted(bpRegistration, - bpos.getBlockPoolId(), - reports.toArray(new StorageReceivedDeletedBlocks[reports.size()])); - success = true; - } finally { - dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime); - if (!success) { - synchronized (pendingIncrementalBRperStorage) { - for (StorageReceivedDeletedBlocks report : reports) { - // If we didn't succeed in sending the report, put all of the - // blocks back onto our queue, but only in the case where we - // didn't put something newer in the meantime. - PerStoragePendingIncrementalBR perStorageMap = - pendingIncrementalBRperStorage.get(report.getStorage()); - perStorageMap.putMissingBlockInfos(report.getBlocks()); - sendImmediateIBR = true; - } - } - } - } - } - - /** - * @return pending incremental block report for given {@code storage} - */ - private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage( - DatanodeStorage storage) { - PerStoragePendingIncrementalBR mapForStorage = - pendingIncrementalBRperStorage.get(storage); - - if (mapForStorage == null) { - // This is the first time we are adding incremental BR state for - // this storage so create a new map. This is required once per - // storage, per service actor. - mapForStorage = new PerStoragePendingIncrementalBR(); - pendingIncrementalBRperStorage.put(storage, mapForStorage); - } - - return mapForStorage; - } - - /** - * Add a blockInfo for notification to NameNode. If another entry - * exists for the same block it is removed. - * - * Caller must synchronize access using pendingIncrementalBRperStorage. - */ - void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo, - DatanodeStorage storage) { - // Make sure another entry for the same block is first removed. - // There may only be one such entry. - for (Map.Entry entry : - pendingIncrementalBRperStorage.entrySet()) { - if (entry.getValue().removeBlockInfo(bInfo)) { - break; - } - } - getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo); - } - - /* - * Informing the name node could take a long long time! Should we wait - * till namenode is informed before responding with success to the - * client? For now we don't. - */ - void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo, - String storageUuid, boolean now) { - synchronized (pendingIncrementalBRperStorage) { - addPendingReplicationBlockInfo( - bInfo, dn.getFSDataset().getStorage(storageUuid)); - sendImmediateIBR = true; - // If now is true, the report is sent right away. - // Otherwise, it will be sent out in the next heartbeat. - if (now) { - pendingIncrementalBRperStorage.notifyAll(); - } - } - } - - void notifyNamenodeDeletedBlock( - ReceivedDeletedBlockInfo bInfo, String storageUuid) { - synchronized (pendingIncrementalBRperStorage) { - addPendingReplicationBlockInfo( - bInfo, dn.getFSDataset().getStorage(storageUuid)); - } - } /** * Run an immediate block report on this thread. Used by tests. */ @VisibleForTesting void triggerBlockReportForTests() { - synchronized (pendingIncrementalBRperStorage) { + synchronized (ibrManager) { scheduler.scheduleHeartbeat(); long oldBlockReportTime = scheduler.nextBlockReportTime; scheduler.forceFullBlockReportNow(); - pendingIncrementalBRperStorage.notifyAll(); + ibrManager.notifyAll(); while (oldBlockReportTime == scheduler.nextBlockReportTime) { try { - pendingIncrementalBRperStorage.wait(100); + ibrManager.wait(100); } catch (InterruptedException e) { return; } @@ -375,28 +244,12 @@ class BPServiceActor implements Runnable { @VisibleForTesting void triggerHeartbeatForTests() { - synchronized (pendingIncrementalBRperStorage) { + synchronized (ibrManager) { final long nextHeartbeatTime = scheduler.scheduleHeartbeat(); - pendingIncrementalBRperStorage.notifyAll(); + ibrManager.notifyAll(); while (nextHeartbeatTime - scheduler.nextHeartbeatTime >= 0) { try { - pendingIncrementalBRperStorage.wait(100); - } catch (InterruptedException e) { - return; - } - } - } - } - - @VisibleForTesting - void triggerDeletionReportForTests() { - synchronized (pendingIncrementalBRperStorage) { - sendImmediateIBR = true; - pendingIncrementalBRperStorage.notifyAll(); - - while (sendImmediateIBR) { - try { - pendingIncrementalBRperStorage.wait(100); + ibrManager.wait(100); } catch (InterruptedException e) { return; } @@ -404,11 +257,6 @@ class BPServiceActor implements Runnable { } } - @VisibleForTesting - boolean hasPendingIBR() { - return sendImmediateIBR; - } - private long generateUniqueBlockReportId() { // Initialize the block report ID the first time through. // Note that 0 is used on the NN to indicate "uninitialized", so we should @@ -432,7 +280,8 @@ class BPServiceActor implements Runnable { // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. - reportReceivedDeletedBlocks(); + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), dn.getMetrics()); long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = @@ -697,8 +546,9 @@ class BPServiceActor implements Runnable { } } } - if (sendImmediateIBR || sendHeartbeat) { - reportReceivedDeletedBlocks(); + if (ibrManager.sendImmediately() || sendHeartbeat) { + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), dn.getMetrics()); } List cmds = null; @@ -723,10 +573,10 @@ class BPServiceActor implements Runnable { // or work arrives, and then iterate again. // long waitTime = scheduler.getHeartbeatWaitTime(); - synchronized(pendingIncrementalBRperStorage) { - if (waitTime > 0 && !sendImmediateIBR) { + synchronized(ibrManager) { + if (waitTime > 0 && !ibrManager.sendImmediately()) { try { - pendingIncrementalBRperStorage.wait(waitTime); + ibrManager.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferService for " + this + " interrupted"); } @@ -915,82 +765,15 @@ class BPServiceActor implements Runnable { } } - private static class PerStoragePendingIncrementalBR { - private final Map pendingIncrementalBR = - Maps.newHashMap(); - - /** - * Return the number of blocks on this storage that have pending - * incremental block reports. - * @return - */ - int getBlockInfoCount() { - return pendingIncrementalBR.size(); - } - - /** - * Dequeue and return all pending incremental block report state. - * @return - */ - ReceivedDeletedBlockInfo[] dequeueBlockInfos() { - ReceivedDeletedBlockInfo[] blockInfos = - pendingIncrementalBR.values().toArray( - new ReceivedDeletedBlockInfo[getBlockInfoCount()]); - - pendingIncrementalBR.clear(); - return blockInfos; - } - - /** - * Add blocks from blockArray to pendingIncrementalBR, unless the - * block already exists in pendingIncrementalBR. - * @param blockArray list of blocks to add. - * @return the number of missing blocks that we added. - */ - int putMissingBlockInfos(ReceivedDeletedBlockInfo[] blockArray) { - int blocksPut = 0; - for (ReceivedDeletedBlockInfo rdbi : blockArray) { - if (!pendingIncrementalBR.containsKey(rdbi.getBlock().getBlockId())) { - pendingIncrementalBR.put(rdbi.getBlock().getBlockId(), rdbi); - ++blocksPut; - } - } - return blocksPut; - } - - /** - * Add pending incremental block report for a single block. - * @param blockInfo - */ - void putBlockInfo(ReceivedDeletedBlockInfo blockInfo) { - pendingIncrementalBR.put(blockInfo.getBlock().getBlockId(), blockInfo); - } - - /** - * Remove pending incremental block report for a single block if it - * exists. - * - * @param blockInfo - * @return true if a report was removed, false if no report existed for - * the given block. - */ - boolean removeBlockInfo(ReceivedDeletedBlockInfo blockInfo) { - return (pendingIncrementalBR.remove(blockInfo.getBlock().getBlockId()) != null); - } - } - - void triggerBlockReport(BlockReportOptions options) throws IOException { + void triggerBlockReport(BlockReportOptions options) { if (options.isIncremental()) { LOG.info(bpos.toString() + ": scheduling an incremental block report."); - synchronized(pendingIncrementalBRperStorage) { - sendImmediateIBR = true; - pendingIncrementalBRperStorage.notifyAll(); - } + ibrManager.triggerIBR(); } else { LOG.info(bpos.toString() + ": scheduling a full block report."); - synchronized(pendingIncrementalBRperStorage) { + synchronized(ibrManager) { scheduler.forceFullBlockReportNow(); - pendingIncrementalBRperStorage.notifyAll(); + ibrManager.notifyAll(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java new file mode 100644 index 0000000..4462f94 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.util.Time.monotonicNow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + +/** + * Manage Incremental Block Reports (IBRs). + */ +@InterfaceAudience.Private +class IncrementalBlockReportManager { + private static class PerStorageIBR { + /** The blocks in this IBR. */ + final Map blocks = Maps.newHashMap(); + + /** + * Remove the given block from this IBR + * @return true if the block was removed; otherwise, return false. + */ + ReceivedDeletedBlockInfo remove(Block block) { + return blocks.remove(block); + } + + /** @return all the blocks removed from this IBR. */ + ReceivedDeletedBlockInfo[] removeAll() { + final int size = blocks.size(); + if (size == 0) { + return null; + } + + final ReceivedDeletedBlockInfo[] rdbis = blocks.values().toArray( + new ReceivedDeletedBlockInfo[size]); + blocks.clear(); + return rdbis; + } + + /** Put the block to this IBR. */ + void put(ReceivedDeletedBlockInfo rdbi) { + blocks.put(rdbi.getBlock(), rdbi); + } + + /** + * Put the all blocks to this IBR unless the block already exists. + * @param rdbis list of blocks to add. + * @return the number of missing blocks added. + */ + int putMissing(ReceivedDeletedBlockInfo[] rdbis) { + int count = 0; + for (ReceivedDeletedBlockInfo rdbi : rdbis) { + if (!blocks.containsKey(rdbi.getBlock())) { + put(rdbi); + count++; + } + } + return count; + } + } + + /** + * Between block reports (which happen on the order of once an hour) the + * DN reports smaller incremental changes to its block list for each storage. + * This map contains the pending changes not yet to be reported to the NN. + */ + private final Map pendingIBRs + = Maps.newHashMap(); + + /** + * If this flag is set then an IBR will be sent immediately by the actor + * thread without waiting for the IBR timer to elapse. + */ + private volatile boolean readyToSend = false; + + boolean sendImmediately() { + return readyToSend; + } + + private synchronized StorageReceivedDeletedBlocks[] generateIBRs() { + final List reports + = new ArrayList<>(pendingIBRs.size()); + for (Map.Entry entry + : pendingIBRs.entrySet()) { + final PerStorageIBR perStorage = entry.getValue(); + + // Send newly-received and deleted blockids to namenode + final ReceivedDeletedBlockInfo[] rdbi = perStorage.removeAll(); + if (rdbi != null) { + reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi)); + } + } + readyToSend = false; + return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]); + } + + private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) { + for (StorageReceivedDeletedBlocks r : reports) { + pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks()); + } + if (reports.length > 0) { + readyToSend = true; + } + } + + /** Send IBRs to namenode. */ + void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration, + String bpid, DataNodeMetrics metrics) throws IOException { + // Generate a list of the pending reports for each storage under the lock + final StorageReceivedDeletedBlocks[] reports = generateIBRs(); + if (reports.length == 0) { + // Nothing new to report. + return; + } + + // Send incremental block reports to the Namenode outside the lock + boolean success = false; + final long startTime = monotonicNow(); + try { + namenode.blockReceivedAndDeleted(registration, bpid, reports); + success = true; + } finally { + metrics.addIncrementalBlockReport(monotonicNow() - startTime); + if (!success) { + // If we didn't succeed in sending the report, put all of the + // blocks back onto our queue, but only in the case where we + // didn't put something newer in the meantime. + putMissing(reports); + } + } + } + + /** @return the pending IBR for the given {@code storage} */ + private PerStorageIBR getPerStorageIBR(DatanodeStorage storage) { + PerStorageIBR perStorage = pendingIBRs.get(storage); + if (perStorage == null) { + // This is the first time we are adding incremental BR state for + // this storage so create a new map. This is required once per + // storage, per service actor. + perStorage = new PerStorageIBR(); + pendingIBRs.put(storage, perStorage); + } + return perStorage; + } + + /** + * Add a block for notification to NameNode. + * If another entry exists for the same block it is removed. + */ + @VisibleForTesting + synchronized void addRDBI(ReceivedDeletedBlockInfo rdbi, + DatanodeStorage storage) { + // Make sure another entry for the same block is first removed. + // There may only be one such entry. + for (PerStorageIBR perStorage : pendingIBRs.values()) { + if (perStorage.remove(rdbi.getBlock()) != null) { + break; + } + } + getPerStorageIBR(storage).put(rdbi); + } + + synchronized void notifyNamenodeBlock(ReceivedDeletedBlockInfo rdbi, + DatanodeStorage storage) { + addRDBI(rdbi, storage); + + final BlockStatus status = rdbi.getStatus(); + if (status == BlockStatus.RECEIVING_BLOCK) { + // the report will be sent out in the next heartbeat. + readyToSend = true; + } else if (status == BlockStatus.RECEIVED_BLOCK) { + // the report is sent right away. + triggerIBR(); + } + } + + synchronized void triggerIBR() { + readyToSend = true; + notifyAll(); + } + + @VisibleForTesting + synchronized void triggerDeletionReportForTests() { + triggerIBR(); + + while (sendImmediately()) { + try { + wait(100); + } catch (InterruptedException e) { + return; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index de73dcf..676d855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; @@ -55,7 +56,6 @@ public class TestIncrementalBlockReports { private static final long DUMMY_BLOCK_GENSTAMP = 1000; private MiniDFSCluster cluster = null; - private DistributedFileSystem fs; private Configuration conf; private NameNode singletonNn; private DataNode singletonDn; @@ -67,7 +67,6 @@ public class TestIncrementalBlockReports { public void startCluster() throws IOException { conf = new HdfsConfiguration(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_COUNT).build(); - fs = cluster.getFileSystem(); singletonNn = cluster.getNameNode(); singletonDn = cluster.getDataNodes().get(0); bpos = singletonDn.getAllBpOs().get(0); @@ -88,7 +87,8 @@ public class TestIncrementalBlockReports { private void injectBlockReceived() { ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( getDummyBlock(), BlockStatus.RECEIVED_BLOCK, null); - actor.notifyNamenodeBlock(rdbi, storageUuid, true); + DatanodeStorage s = singletonDn.getFSDataset().getStorage(storageUuid); + actor.getIbrManager().notifyNamenodeBlock(rdbi, s); } /** @@ -97,7 +97,8 @@ public class TestIncrementalBlockReports { private void injectBlockDeleted() { ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( getDummyBlock(), BlockStatus.DELETED_BLOCK, null); - actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); + actor.getIbrManager().addRDBI(rdbi, + singletonDn.getFSDataset().getStorage(storageUuid)); } /** @@ -206,7 +207,7 @@ public class TestIncrementalBlockReports { any(StorageReceivedDeletedBlocks[].class)); // Ensure that no more IBRs are pending. - assertFalse(actor.hasPendingIBR()); + assertFalse(actor.getIbrManager().sendImmediately()); } finally { cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/117397b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java index 00c0f22..f04a196 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; @@ -93,12 +94,14 @@ public final class TestTriggerBlockReport { DataNode datanode = cluster.getDataNodes().get(0); BPServiceActor actor = datanode.getAllBpOs().get(0).getBPServiceActors().get(0); - String storageUuid; + final FsDatasetSpi dataset = datanode.getFSDataset(); + final DatanodeStorage storage; try (FsDatasetSpi.FsVolumeReferences volumes = - datanode.getFSDataset().getFsVolumeReferences()) { - storageUuid = volumes.get(0).getStorageID(); + dataset.getFsVolumeReferences()) { + storage = dataset.getStorage(volumes.get(0).getStorageID()); } - actor.notifyNamenodeDeletedBlock(rdbi, storageUuid); + + actor.getIbrManager().addRDBI(rdbi, storage); // Manually trigger a block report. datanode.triggerBlockReport(