From common-commits-return-88798-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Oct 2 23:40:16 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DA51B1807A1 for ; Tue, 2 Oct 2018 23:40:13 +0200 (CEST) Received: (qmail 52439 invoked by uid 500); 2 Oct 2018 21:40:04 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 49369 invoked by uid 99); 2 Oct 2018 21:40:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2018 21:40:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED468E11A7; Tue, 2 Oct 2018 21:40:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Tue, 02 Oct 2018 21:40:34 -0000 Message-Id: <40c1a5b3cdef457cbccf4a82790f343b@git.apache.org> In-Reply-To: <59900959bf4a4ae3ba17e37b1c5b5ff5@git.apache.org> References: <59900959bf4a4ae3ba17e37b1c5b5ff5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [34/50] [abbrv] hadoop git commit: HDDS-325. Add event watcher for delete blocks command. Contributed by Lokesh Jain. HDDS-325. Add event watcher for delete blocks command. Contributed by Lokesh Jain. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f7ff8c05 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f7ff8c05 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f7ff8c05 Branch: refs/heads/HDFS-13532 Commit: f7ff8c051e4d4b5cd0eee1884e2a546de1f57793 Parents: fd6be58 Author: Nanda kumar Authored: Mon Oct 1 13:49:55 2018 +0530 Committer: Nanda kumar Committed: Mon Oct 1 13:50:17 2018 +0530 ---------------------------------------------------------------------- .../scm/container/common/helpers/Pipeline.java | 4 + .../report/CommandStatusReportPublisher.java | 2 +- .../common/statemachine/StateContext.java | 58 +++++++----- .../CloseContainerCommandHandler.java | 3 +- .../commandhandler/CommandHandler.java | 7 +- .../DeleteBlocksCommandHandler.java | 43 +++++---- .../ReplicateContainerCommandHandler.java | 3 +- .../states/endpoint/HeartbeatEndpointTask.java | 33 ++++++- .../container/ozoneimpl/ContainerReader.java | 20 +++++ .../StorageContainerDatanodeProtocol.java | 13 --- .../protocol/commands/CommandForDatanode.java | 8 +- .../ozone/protocol/commands/CommandStatus.java | 41 +++++++-- .../commands/DeleteBlockCommandStatus.java | 92 ++++++++++++++++++++ ...rDatanodeProtocolClientSideTranslatorPB.java | 17 ---- ...rDatanodeProtocolServerSideTranslatorPB.java | 16 ---- .../StorageContainerDatanodeProtocol.proto | 11 +-- .../ozone/container/common/ScmTestMock.java | 24 ++--- .../common/report/TestReportPublisher.java | 4 +- .../endpoint/TestHeartbeatEndpointTask.java | 12 +-- .../block/DatanodeDeletedBlockTransactions.java | 20 ++--- .../hdds/scm/block/DeletedBlockLogImpl.java | 24 ++++- .../hdds/scm/block/SCMBlockDeletingService.java | 2 +- .../scm/command/CommandStatusReportHandler.java | 10 ++- .../hdds/scm/container/ContainerMapping.java | 19 ++-- .../hadoop/hdds/scm/events/SCMEvents.java | 19 ++-- .../server/SCMDatanodeHeartbeatDispatcher.java | 11 ++- .../scm/server/SCMDatanodeProtocolServer.java | 22 ----- .../scm/server/StorageContainerManager.java | 12 +++ .../commands/RetriableDatanodeEventWatcher.java | 58 ++++++++++++ .../ozone/protocol/commands/package-info.java | 18 ++++ .../command/TestCommandStatusReportHandler.java | 12 +-- .../TestSCMDatanodeHeartbeatDispatcher.java | 2 +- .../ozone/TestStorageContainerManager.java | 3 + .../commandhandler/TestBlockDeletion.java | 83 ++++++++++++++++-- 34 files changed, 503 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 777efa7..c36ca1f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -171,6 +171,10 @@ public class Pipeline { public Map getDatanodes() { return datanodes; } + + public boolean isEmpty() { + return datanodes.isEmpty(); + } /** * Returns the leader host. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java index 4cf6321..4736857 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java @@ -82,6 +82,6 @@ public class CommandStatusReportPublisher extends map.remove(key); } }); - return builder.build(); + return builder.getCmdStatusCount() > 0 ? builder.build() : null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 47c2492..12c196b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerAction; @@ -35,8 +37,11 @@ import org.apache.hadoop.ozone.container.common.states.datanode import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.CommandStatus .CommandStatusBuilder; +import org.apache.hadoop.ozone.protocol.commands + .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import static java.lang.Math.min; import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +57,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -67,7 +73,7 @@ public class StateContext { private final DatanodeStateMachine parent; private final AtomicLong stateExecutionCount; private final Configuration conf; - private final Queue reports; + private final List reports; private final Queue containerActions; private final Queue pipelineActions; private DatanodeStateMachine.DatanodeStates state; @@ -174,19 +180,23 @@ public class StateContext { * @param report report to be added */ public void addReport(GeneratedMessage report) { - synchronized (reports) { - reports.add(report); + if (report != null) { + synchronized (reports) { + reports.add(report); + } } } /** - * Returns the next report, or null if the report queue is empty. + * Adds the reports which could not be sent by heartbeat back to the + * reports list. * - * @return report + * @param reportsToPutBack list of reports which failed to be sent by + * heartbeat. */ - public GeneratedMessage getNextReport() { + public void putBackReports(List reportsToPutBack) { synchronized (reports) { - return reports.poll(); + reports.addAll(0, reportsToPutBack); } } @@ -207,19 +217,14 @@ public class StateContext { * @return List */ public List getReports(int maxLimit) { - List reportList = new ArrayList<>(); + List reportsToReturn = new LinkedList<>(); synchronized (reports) { - if (!reports.isEmpty()) { - int size = reports.size(); - int limit = size > maxLimit ? maxLimit : size; - for (int count = 0; count < limit; count++) { - GeneratedMessage report = reports.poll(); - Preconditions.checkNotNull(report); - reportList.add(report); - } - } - return reportList; + List tempList = reports.subList( + 0, min(reports.size(), maxLimit)); + reportsToReturn.addAll(tempList); + tempList.clear(); } + return reportsToReturn; } @@ -442,9 +447,14 @@ public class StateContext { * @param cmd - {@link SCMCommand}. */ public void addCmdStatus(SCMCommand cmd) { + CommandStatusBuilder statusBuilder; + if (cmd.getType() == Type.deleteBlocksCommand) { + statusBuilder = new DeleteBlockCommandStatusBuilder(); + } else { + statusBuilder = CommandStatusBuilder.newBuilder(); + } this.addCmdStatus(cmd.getId(), - CommandStatusBuilder.newBuilder() - .setCmdId(cmd.getId()) + statusBuilder.setCmdId(cmd.getId()) .setStatus(Status.PENDING) .setType(cmd.getType()) .build()); @@ -469,13 +479,13 @@ public class StateContext { /** * Updates status of a pending status command. * @param cmdId command id - * @param cmdExecuted SCMCommand + * @param cmdStatusUpdater Consumer to update command status. * @return true if command status updated successfully else false. */ - public boolean updateCommandStatus(Long cmdId, boolean cmdExecuted) { + public boolean updateCommandStatus(Long cmdId, + Consumer cmdStatusUpdater) { if(cmdStatusMap.containsKey(cmdId)) { - cmdStatusMap.get(cmdId) - .setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); + cmdStatusUpdater.accept(cmdStatusMap.get(cmdId)); return true; } return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 0af591b..2c3db61 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -117,7 +117,8 @@ public class CloseContainerCommandHandler implements CommandHandler { cmdExecuted = false; } } finally { - updateCommandStatus(context, command, cmdExecuted, LOG); + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(cmdExecuted), LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java index 71c25b5..1ea0ea8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java @@ -23,9 +23,12 @@ import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; +import java.util.function.Consumer; + /** * Generic interface for handlers. */ @@ -63,8 +66,8 @@ public interface CommandHandler { * Default implementation for updating command status. */ default void updateCommandStatus(StateContext context, SCMCommand command, - boolean cmdExecuted, Logger log) { - if (!context.updateCommandStatus(command.getId(), cmdExecuted)) { + Consumer cmdStatusUpdater, Logger log) { + if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) { log.debug("{} with Id:{} not found.", command.getType(), command.getId()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java index 430b0ef..aa63fb4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -39,11 +39,11 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine .SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CommandStatus; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlockCommandStatus; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.function.Consumer; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .Result.CONTAINER_NOT_FOUND; @@ -63,7 +64,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos */ public class DeleteBlocksCommandHandler implements CommandHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); private final ContainerSet containerSet; @@ -83,6 +84,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler { StateContext context, SCMConnectionManager connectionManager) { cmdExecuted = false; long startTime = Time.monotonicNow(); + ContainerBlocksDeletionACKProto blockDeletionACK = null; try { if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) { LOG.warn("Skipping handling command, expected command " @@ -144,31 +146,28 @@ public class DeleteBlocksCommandHandler implements CommandHandler { .setDnId(context.getParent().getDatanodeDetails() .getUuid().toString()); }); - ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); + blockDeletionACK = resultBuilder.build(); // Send ACK back to SCM as long as meta updated // TODO Or we should wait until the blocks are actually deleted? if (!containerBlocks.isEmpty()) { - for (EndpointStateMachine endPoint : connectionManager.getValues()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending following block deletion ACK to SCM"); - for (DeleteBlockTransactionResult result : - blockDeletionACK.getResultsList()) { - LOG.debug(result.getTxID() + " : " + result.getSuccess()); - } - } - endPoint.getEndPoint() - .sendContainerBlocksDeletionACK(blockDeletionACK); - } catch (IOException e) { - LOG.error("Unable to send block deletion ACK to SCM {}", - endPoint.getAddress().toString(), e); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending following block deletion ACK to SCM"); + for (DeleteBlockTransactionResult result : blockDeletionACK + .getResultsList()) { + LOG.debug(result.getTxID() + " : " + result.getSuccess()); } } } cmdExecuted = true; } finally { - updateCommandStatus(context, command, cmdExecuted, LOG); + final ContainerBlocksDeletionACKProto deleteAck = + blockDeletionACK; + Consumer statusUpdater = (cmdStatus) -> { + cmdStatus.setStatus(cmdExecuted); + ((DeleteBlockCommandStatus) cmdStatus).setBlocksDeletionAck(deleteAck); + }; + updateCommandStatus(context, command, statusUpdater, LOG); long endTime = Time.monotonicNow(); totalTime += endTime - startTime; } @@ -238,9 +237,9 @@ public class DeleteBlocksCommandHandler implements CommandHandler { } } - containerDB.put(DFSUtil.string2Bytes( - OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()), - Longs.toByteArray(delTX.getTxID())); + containerDB + .put(DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX), + Longs.toByteArray(delTX.getTxID())); containerData .updateDeleteTransactionId(delTX.getTxID()); // update pending deletion blocks count in in-memory container status http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 09c379f..81d162d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -77,7 +77,8 @@ public class ReplicateContainerCommandHandler implements CommandHandler { supervisor.addTask(replicationTask); } finally { - updateCommandStatus(context, command, true, LOG); + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(true), LOG); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 5769e6d..4fd72ec 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.ZonedDateTime; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; @@ -124,12 +125,12 @@ public class HeartbeatEndpointTask @Override public EndpointStateMachine.EndPointStates call() throws Exception { rpcEndpoint.lock(); + SCMHeartbeatRequestProto.Builder requestBuilder = null; try { Preconditions.checkState(this.datanodeDetailsProto != null); - SCMHeartbeatRequestProto.Builder requestBuilder = - SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetailsProto); + requestBuilder = SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto); addReports(requestBuilder); addContainerActions(requestBuilder); addPipelineActions(requestBuilder); @@ -139,6 +140,8 @@ public class HeartbeatEndpointTask rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); } catch (IOException ex) { + // put back the reports which failed to be sent + putBackReports(requestBuilder); rpcEndpoint.logIfNeeded(ex); } finally { rpcEndpoint.unlock(); @@ -146,6 +149,24 @@ public class HeartbeatEndpointTask return rpcEndpoint.getState(); } + // TODO: Make it generic. + private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder) { + List reports = new LinkedList<>(); + if (requestBuilder.hasContainerReport()) { + reports.add(requestBuilder.getContainerReport()); + } + if (requestBuilder.hasNodeReport()) { + reports.add(requestBuilder.getNodeReport()); + } + if (requestBuilder.getCommandStatusReportsCount() != 0) { + for (GeneratedMessage msg : requestBuilder + .getCommandStatusReportsList()) { + reports.add(msg); + } + } + context.putBackReports(reports); + } + /** * Adds all the available reports to heartbeat. * @@ -158,7 +179,11 @@ public class HeartbeatEndpointTask SCMHeartbeatRequestProto.getDescriptor().getFields()) { String heartbeatFieldName = descriptor.getMessageType().getFullName(); if (heartbeatFieldName.equals(reportName)) { - requestBuilder.setField(descriptor, report); + if (descriptor.isRepeated()) { + requestBuilder.addRepeatedField(descriptor, report); + } else { + requestBuilder.setField(descriptor, report); + } } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java index 7c986f0..c3a4126 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java @@ -19,10 +19,13 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.Storage; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerData; @@ -32,7 +35,10 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.apache.hadoop.utils.MetadataKeyFilters; +import org.apache.hadoop.utils.MetadataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -171,6 +177,20 @@ public class ContainerReader implements Runnable { KeyValueContainerUtil.parseKVContainerData(kvContainerData, config); KeyValueContainer kvContainer = new KeyValueContainer( kvContainerData, config); + MetadataStore containerDB = BlockUtils.getDB(kvContainerData, config); + MetadataKeyFilters.KeyPrefixFilter filter = + new MetadataKeyFilters.KeyPrefixFilter() + .addFilter(OzoneConsts.DELETING_KEY_PREFIX); + int numPendingDeletionBlocks = + containerDB.getSequentialRangeKVs(null, Integer.MAX_VALUE, filter) + .size(); + kvContainerData.incrPendingDeletionBlocks(numPendingDeletionBlocks); + byte[] delTxnId = containerDB.get( + DFSUtil.string2Bytes(OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX)); + if (delTxnId != null) { + kvContainerData + .updateDeleteTransactionId(Longs.fromByteArray(delTxnId)); + } containerSet.addContainer(kvContainer); } else { throw new StorageContainerException("Container File is corrupted. " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 9296524..e3b5370 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -23,11 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; @@ -77,12 +72,4 @@ public interface StorageContainerDatanodeProtocol { ContainerReportsProto containerReportsRequestProto, PipelineReportsProto pipelineReports) throws IOException; - /** - * Used by datanode to send block deletion ACK to SCM. - * @param request block deletion transactions. - * @return block deletion transaction response. - * @throws IOException - */ - ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto request) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java index 0c4964a..69337fb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java @@ -20,11 +20,13 @@ package org.apache.hadoop.ozone.protocol.commands; import java.util.UUID; import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; /** * Command for the datanode with the destination address. */ -public class CommandForDatanode { +public class CommandForDatanode implements + IdentifiableEventPayload { private final UUID datanodeId; @@ -42,4 +44,8 @@ public class CommandForDatanode { public SCMCommand getCommand() { return command; } + + public long getId() { + return command.getId(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java index 32cf7c2..4b3ce84 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java @@ -35,6 +35,13 @@ public class CommandStatus { private Status status; private String msg; + CommandStatus(Type type, Long cmdId, Status status, String msg) { + this.type = type; + this.cmdId = cmdId; + this.status = status; + this.msg = msg; + } + public Type getType() { return type; } @@ -60,6 +67,10 @@ public class CommandStatus { this.status = status; } + public void setStatus(boolean cmdExecuted) { + setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); + } + /** * Returns a CommandStatus from the protocol buffers. * @@ -72,7 +83,8 @@ public class CommandStatus { .setCmdId(cmdStatusProto.getCmdId()) .setStatus(cmdStatusProto.getStatus()) .setType(cmdStatusProto.getType()) - .setMsg(cmdStatusProto.getMsg()).build(); + .setMsg(cmdStatusProto.getMsg()) + .build(); } /** * Returns a CommandStatus from the protocol buffers. @@ -95,20 +107,36 @@ public class CommandStatus { /** * Builder class for CommandStatus. */ - public static final class CommandStatusBuilder { + public static class CommandStatusBuilder { private SCMCommandProto.Type type; private Long cmdId; private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status; private String msg; - private CommandStatusBuilder() { + CommandStatusBuilder() { } public static CommandStatusBuilder newBuilder() { return new CommandStatusBuilder(); } + public Type getType() { + return type; + } + + public Long getCmdId() { + return cmdId; + } + + public Status getStatus() { + return status; + } + + public String getMsg() { + return msg; + } + public CommandStatusBuilder setType(Type commandType) { this.type = commandType; return this; @@ -130,12 +158,7 @@ public class CommandStatus { } public CommandStatus build() { - CommandStatus commandStatus = new CommandStatus(); - commandStatus.type = this.type; - commandStatus.msg = this.msg; - commandStatus.status = this.status; - commandStatus.cmdId = this.cmdId; - return commandStatus; + return new CommandStatus(type, cmdId, status, msg); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java new file mode 100644 index 0000000..2659ab3 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; + +public class DeleteBlockCommandStatus extends CommandStatus { + + private ContainerBlocksDeletionACKProto blocksDeletionAck = null; + + public DeleteBlockCommandStatus(Type type, Long cmdId, + StorageContainerDatanodeProtocolProtos.CommandStatus.Status status, + String msg, ContainerBlocksDeletionACKProto blocksDeletionAck) { + super(type, cmdId, status, msg); + this.blocksDeletionAck = blocksDeletionAck; + } + + public void setBlocksDeletionAck( + ContainerBlocksDeletionACKProto deletionAck) { + blocksDeletionAck = deletionAck; + } + + @Override + public CommandStatus getFromProtoBuf( + StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { + return DeleteBlockCommandStatusBuilder.newBuilder() + .setBlockDeletionAck(cmdStatusProto.getBlockDeletionAck()) + .setCmdId(cmdStatusProto.getCmdId()) + .setStatus(cmdStatusProto.getStatus()) + .setType(cmdStatusProto.getType()) + .setMsg(cmdStatusProto.getMsg()) + .build(); + } + + @Override + public StorageContainerDatanodeProtocolProtos.CommandStatus getProtoBufMessage() { + StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = + StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() + .setCmdId(this.getCmdId()) + .setStatus(this.getStatus()) + .setType(this.getType()); + if (blocksDeletionAck != null) { + builder.setBlockDeletionAck(blocksDeletionAck); + } + if (this.getMsg() != null) { + builder.setMsg(this.getMsg()); + } + return builder.build(); + } + + public static final class DeleteBlockCommandStatusBuilder + extends CommandStatusBuilder { + private ContainerBlocksDeletionACKProto blocksDeletionAck = null; + + public static DeleteBlockCommandStatusBuilder newBuilder() { + return new DeleteBlockCommandStatusBuilder(); + } + + public DeleteBlockCommandStatusBuilder setBlockDeletionAck( + ContainerBlocksDeletionACKProto deletionAck) { + this.blocksDeletionAck = deletionAck; + return this; + } + + @Override + public CommandStatus build() { + return new DeleteBlockCommandStatus(getType(), getCmdId(), getStatus(), + getMsg(), blocksDeletionAck); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index b9cf6f9..4e1e27e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -23,11 +23,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; @@ -169,16 +164,4 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return response; } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { - final ContainerBlocksDeletionACKResponseProto resp; - try { - resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER, - deletedBlocks); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - return resp; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index ed01822..8622332 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -33,11 +33,6 @@ import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; @@ -96,15 +91,4 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB } } - - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - RpcController controller, ContainerBlocksDeletionACKProto request) - throws ServiceException { - try { - return impl.sendContainerBlocksDeletionACK(request); - } catch (IOException e) { - throw new ServiceException(e); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 78758cb..982029c 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -80,7 +80,7 @@ message SCMHeartbeatRequestProto { required DatanodeDetailsProto datanodeDetails = 1; optional NodeReportProto nodeReport = 2; optional ContainerReportsProto containerReport = 3; - optional CommandStatusReportsProto commandStatusReport = 4; + repeated CommandStatusReportsProto commandStatusReports = 4; optional ContainerActionsProto containerActions = 5; optional PipelineActionsProto pipelineActions = 6; optional PipelineReportsProto pipelineReports = 7; @@ -145,6 +145,7 @@ message CommandStatus { required Status status = 2 [default = PENDING]; required SCMCommandProto.Type type = 3; optional string msg = 4; + optional ContainerBlocksDeletionACKProto blockDeletionAck = 5; } message ContainerActionsProto { @@ -272,10 +273,6 @@ message ContainerBlocksDeletionACKProto { required string dnId = 2; } -// SendACK response returned by datanode to SCM, currently empty. -message ContainerBlocksDeletionACKResponseProto { -} - /** This command asks the datanode to close a specific container. */ @@ -386,8 +383,4 @@ service StorageContainerDatanodeProtocolService { */ rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto); - /** - * Sends the block deletion ACK to SCM. - */ - rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index 27b6272..3e45596 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto. StorageContainerDatanodeProtocolProtos.CommandStatus; import org.apache.hadoop.hdds.scm.VersionInfo; @@ -30,11 +32,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerInfo; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; @@ -196,10 +193,12 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { sendHeartbeat(SCMHeartbeatRequestProto heartbeat) throws IOException { rpcCount.incrementAndGet(); heartbeatCount.incrementAndGet(); - if(heartbeat.hasCommandStatusReport()){ - cmdStatusList.addAll(heartbeat.getCommandStatusReport() - .getCmdStatusList()); - commandStatusReport.incrementAndGet(); + if (heartbeat.getCommandStatusReportsCount() != 0) { + for (CommandStatusReportsProto statusReport : heartbeat + .getCommandStatusReportsList()) { + cmdStatusList.addAll(statusReport.getCmdStatusList()); + commandStatusReport.incrementAndGet(); + } } sleepIfNeeded(); return SCMHeartbeatResponseProto.newBuilder().addAllCommands( @@ -305,13 +304,6 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { return 0; } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto request) throws IOException { - return ContainerBlocksDeletionACKResponseProto - .newBuilder().getDefaultInstanceForType(); - } - /** * Reset the mock Scm for test to get a fresh start without rebuild MockScm. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java index 811599f..1e82326 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -138,9 +138,7 @@ public class TestReportPublisher { new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Unit test ReportManager Thread - %d").build()); publisher.init(dummyContext, executorService); - Assert.assertEquals(0, - ((CommandStatusReportPublisher) publisher).getReport() - .getCmdStatusCount()); + Assert.assertNull(((CommandStatusReportPublisher) publisher).getReport()); // Insert to status object to state context map and then get the report. CommandStatus obj1 = CommandStatus.CommandStatusBuilder.newBuilder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 69a6a33..606940b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -77,7 +77,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -108,7 +108,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertTrue(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -139,7 +139,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -170,7 +170,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0); Assert.assertFalse(heartbeat.hasContainerActions()); } @@ -201,7 +201,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertFalse(heartbeat.hasNodeReport()); Assert.assertFalse(heartbeat.hasContainerReport()); - Assert.assertFalse(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() == 0); Assert.assertTrue(heartbeat.hasContainerActions()); } @@ -235,7 +235,7 @@ public class TestHeartbeatEndpointTask { Assert.assertTrue(heartbeat.hasDatanodeDetails()); Assert.assertTrue(heartbeat.hasNodeReport()); Assert.assertTrue(heartbeat.hasContainerReport()); - Assert.assertTrue(heartbeat.hasCommandStatusReport()); + Assert.assertTrue(heartbeat.getCommandStatusReportsCount() != 0); Assert.assertTrue(heartbeat.hasContainerActions()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java index 25420fe..8702a42 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -61,7 +61,8 @@ public class DatanodeDeletedBlockTransactions { try { ContainerWithPipeline containerWithPipeline = mappingService.getContainerWithPipeline(tx.getContainerID()); - if (containerWithPipeline.getContainerInfo().isContainerOpen()) { + if (containerWithPipeline.getContainerInfo().isContainerOpen() + || containerWithPipeline.getPipeline().isEmpty()) { return false; } pipeline = containerWithPipeline.getPipeline(); @@ -70,25 +71,19 @@ public class DatanodeDeletedBlockTransactions { return false; } - if (pipeline == null) { - SCMBlockDeletingService.LOG.warn( - "Container {} not found, continue to process next", - tx.getContainerID()); - return false; - } - + boolean success = false; for (DatanodeDetails dd : pipeline.getMachines()) { UUID dnID = dd.getUuid(); if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted.contains(dnID)) { // Transaction need not be sent to dns which have already committed it - addTransactionToDN(dnID, tx); + success = addTransactionToDN(dnID, tx); } } - return true; + return success; } - private void addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { + private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { if (transactions.containsKey(dnID)) { List txs = transactions.get(dnID); if (txs != null && txs.size() < maximumAllowedTXNum) { @@ -103,14 +98,17 @@ public class DatanodeDeletedBlockTransactions { if (!hasContained) { txs.add(tx); currentTXNum++; + return true; } } } else { currentTXNum++; transactions.put(dnID, tx); + return true; } SCMBlockDeletingService.LOG .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID()); + return false; } Set getDatanodeIDs() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 49af65c..68435d1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -23,10 +23,16 @@ import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.scm.command + .CommandStatusReportHandler.DeleteBlockStatus; import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -75,9 +81,10 @@ import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB; * equally same chance to be retrieved which only depends on the nature * order of the transaction ID. */ -public class DeletedBlockLogImpl implements DeletedBlockLog { +public class DeletedBlockLogImpl + implements DeletedBlockLog, EventHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DeletedBlockLogImpl.class); private static final byte[] LATEST_TXID = @@ -123,7 +130,7 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } @VisibleForTesting - MetadataStore getDeletedStore() { + public MetadataStore getDeletedStore() { return deletedStore; } @@ -269,6 +276,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { deletedStore.delete(Longs.toByteArray(txID)); } } + LOG.debug("Datanode txId={} containerId={} committed by dnId={}", + txID, containerId, dnID); } catch (IOException e) { LOG.warn("Could not commit delete block transaction: " + transactionResult.getTxID(), e); @@ -407,4 +416,13 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { lock.unlock(); } } + + @Override + public void onMessage(DeleteBlockStatus deleteBlockStatus, + EventPublisher publisher) { + ContainerBlocksDeletionACKProto ackProto = + deleteBlockStatus.getCmdStatus().getBlockDeletionAck(); + commitTransactions(ackProto.getResultsList(), + UUID.fromString(ackProto.getDnId())); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index de3fe26..b85d77f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -165,7 +165,7 @@ public class SCMBlockDeletingService extends BackgroundService { // We should stop caching new commands if num of un-processed // command is bigger than a limit, e.g 50. In case datanode goes // offline for sometime, the cached commands be flooded. - eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs))); LOG.debug( "Added delete block command for datanode {} in the queue," http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 054665a..c0de382 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -63,8 +63,10 @@ public class CommandStatusReportHandler implements CloseContainerStatus(cmdStatus)); break; case deleteBlocksCommand: - publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new - DeleteBlockCommandStatus(cmdStatus)); + if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { + publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, + new DeleteBlockStatus(cmdStatus)); + } break; default: LOGGER.debug("CommandStatus of type:{} not handled in " + @@ -120,8 +122,8 @@ public class CommandStatusReportHandler implements /** * Wrapper event for DeleteBlock Command. */ - public static class DeleteBlockCommandStatus extends CommandStatusEvent { - public DeleteBlockCommandStatus(CommandStatus cmdStatus) { + public static class DeleteBlockStatus extends CommandStatusEvent { + public DeleteBlockStatus(CommandStatus cmdStatus) { super(cmdStatus); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index eb0a0b4..71e17e9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -176,7 +175,10 @@ public class ContainerMapping implements Mapping { } /** - * Returns the ContainerInfo from the container ID. + * Returns the ContainerInfo and pipeline from the containerID. If container + * has no available replicas in datanodes it returns pipeline with no + * datanodes and empty leaderID . Pipeline#isEmpty can be used to check for + * an empty pipeline. * * @param containerID - ID of container. * @return - ContainerWithPipeline such as creation state and the pipeline. @@ -200,6 +202,7 @@ public class ContainerMapping implements Mapping { contInfo = ContainerInfo.fromProtobuf(temp); Pipeline pipeline; + String leaderId = ""; if (contInfo.isContainerOpen()) { // If pipeline with given pipeline Id already exist return it pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID()); @@ -207,14 +210,12 @@ public class ContainerMapping implements Mapping { // For close containers create pipeline from datanodes with replicas Set dnWithReplicas = containerStateManager .getContainerReplicas(contInfo.containerID()); - if (dnWithReplicas.size() == 0) { - throw new SCMException("Can't create a pipeline for container with " - + "no replica.", ResultCodes.NO_REPLICA_FOUND); + if (!dnWithReplicas.isEmpty()) { + leaderId = dnWithReplicas.iterator().next().getUuidString(); } - pipeline = - new Pipeline(dnWithReplicas.iterator().next().getUuidString(), - contInfo.getState(), ReplicationType.STAND_ALONE, - contInfo.getReplicationFactor(), PipelineID.randomId()); + pipeline = new Pipeline(leaderId, contInfo.getState(), + ReplicationType.STAND_ALONE, contInfo.getReplicationFactor(), + PipelineID.randomId()); dnWithReplicas.forEach(pipeline::addMember); } return new ContainerWithPipeline(contInfo, pipeline); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 745e052..77b8713 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -23,8 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .CloseContainerStatus; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .DeleteBlockCommandStatus; +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler .ReplicationStatus; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler @@ -128,6 +127,10 @@ public final class SCMEvents { public static final Event DATANODE_COMMAND = new TypedEvent<>(CommandForDatanode.class, "Datanode_Command"); + public static final TypedEvent + RETRIABLE_DATANODE_COMMAND = + new TypedEvent<>(CommandForDatanode.class, "Retriable_Datanode_Command"); + /** * A Close Container Event can be triggered under many condition. Some of them * are: 1. A Container is full, then we stop writing further information to @@ -179,7 +182,7 @@ public final class SCMEvents { * status for Replication SCMCommand is received. */ public static final Event REPLICATION_STATUS = new - TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus"); + TypedEvent<>(ReplicationStatus.class, "Replicate_Command_Status"); /** * This event will be triggered by CommandStatusReportHandler whenever a * status for CloseContainer SCMCommand is received. @@ -187,15 +190,15 @@ public final class SCMEvents { public static final Event CLOSE_CONTAINER_STATUS = new TypedEvent<>(CloseContainerStatus.class, - "CloseContainerCommandStatus"); + "Close_Container_Command_Status"); /** * This event will be triggered by CommandStatusReportHandler whenever a * status for DeleteBlock SCMCommand is received. */ - public static final Event + public static final TypedEvent DELETE_BLOCK_STATUS = - new TypedEvent<>(DeleteBlockCommandStatus.class, - "DeleteBlockCommandStatus"); + new TypedEvent<>(CommandStatusReportHandler.DeleteBlockStatus.class, + "Delete_Block_Status"); /** * This event will be triggered while processing container reports from DN @@ -203,7 +206,7 @@ public final class SCMEvents { * deleteTransactionID on SCM. */ public static final Event PENDING_DELETE_STATUS = - new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus"); + new TypedEvent<>(PendingDeleteStatusList.class, "Pending_Delete_Status"); /** * This is the command for ReplicationManager to handle under/over http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index e65de8b..d9a0875 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -121,10 +121,13 @@ public final class SCMDatanodeHeartbeatDispatcher { heartbeat.getPipelineActions())); } - if (heartbeat.hasCommandStatusReport()) { - eventPublisher.fireEvent(CMD_STATUS_REPORT, - new CommandStatusReportFromDatanode(datanodeDetails, - heartbeat.getCommandStatusReport())); + if (heartbeat.getCommandStatusReportsCount() != 0) { + for (CommandStatusReportsProto commandStatusReport : heartbeat + .getCommandStatusReportsList()) { + eventPublisher.fireEvent(CMD_STATUS_REPORT, + new CommandStatusReportFromDatanode(datanodeDetails, + commandStatusReport)); + } } return commands; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index 4a0d3e5..9c6fa88 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -49,15 +49,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; - import static org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto @@ -100,7 +91,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY; @@ -242,18 +232,6 @@ public class SCMDatanodeProtocolServer implements .addAllCommands(cmdResponses).build(); } - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto acks) throws IOException { - if (acks.getResultsCount() > 0) { - List resultList = acks.getResultsList(); - scm.getScmBlockManager().getDeletedBlockLog() - .commitTransactions(resultList, UUID.fromString(acks.getDnId())); - } - return ContainerBlocksDeletionACKResponseProto.newBuilder() - .getDefaultInstanceForType(); - } - /** * Returns a SCMCommandRepose from the SCM Command. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 52af62d..bb72075 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; +import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; @@ -68,6 +69,7 @@ import org.apache.hadoop.hdds.scm.pipelines.PipelineReportHandler; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; @@ -253,6 +255,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", watcherTimeout); + RetriableDatanodeEventWatcher retriableDatanodeEventWatcher = + new RetriableDatanodeEventWatcher<>( + SCMEvents.RETRIABLE_DATANODE_COMMAND, + SCMEvents.DELETE_BLOCK_STATUS, + commandWatcherLeaseManager); + retriableDatanodeEventWatcher.start(eventQueue); + //TODO: support configurable containerPlacement policy ContainerPlacementPolicy containerPlacementPolicy = new SCMContainerPlacementCapacity(scmNodeManager, conf); @@ -282,6 +291,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl httpServer = new StorageContainerManagerHttpServer(conf); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); + eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler); @@ -296,6 +306,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl replicationStatus.getChillModeStatusListener()); eventQueue .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); + eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, + (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionEventHandler); eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java new file mode 100644 index 0000000..2a50bca --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/RetriableDatanodeEventWatcher.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.CommandStatusEvent; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EventWatcher for start events and completion events with payload of type + * RetriablePayload and RetriableCompletionPayload respectively. + */ +public class RetriableDatanodeEventWatcher + extends EventWatcher { + + public static final Logger LOG = + LoggerFactory.getLogger(RetriableDatanodeEventWatcher.class); + + public RetriableDatanodeEventWatcher(Event startEvent, + Event completionEvent, LeaseManager leaseManager) { + super(startEvent, completionEvent, leaseManager); + } + + @Override + protected void onTimeout(EventPublisher publisher, + CommandForDatanode payload) { + LOG.info("RetriableDatanodeCommand type={} with id={} timed out. Retrying.", + payload.getCommand().getType(), payload.getId()); + //put back to the original queue + publisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, payload); + } + + @Override + protected void onFinished(EventPublisher publisher, + CommandForDatanode payload) { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java new file mode 100644 index 0000000..b1d2838 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/protocol/commands/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java index 65a2e29..afa25e2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java @@ -67,20 +67,20 @@ public class TestCommandStatusReportHandler implements EventPublisher { CommandStatusReportFromDatanode report = this.getStatusReport(Collections .emptyList()); cmdStatusReportHandler.onMessage(report, this); - assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus")); + assertFalse(logCapturer.getOutput().contains("Delete_Block_Status")); assertFalse(logCapturer.getOutput().contains( - "CloseContainerCommandStatus")); - assertFalse(logCapturer.getOutput().contains("ReplicateCommandStatus")); + "Close_Container_Command_Status")); + assertFalse(logCapturer.getOutput().contains("Replicate_Command_Status")); report = this.getStatusReport(this.getCommandStatusList()); cmdStatusReportHandler.onMessage(report, this); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "DeleteBlockCommandStatus")); + "Delete_Block_Status")); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "CloseContainerCommandStatus")); + "Close_Container_Command_Status")); assertTrue(logCapturer.getOutput().contains("firing event of type " + - "ReplicateCommandStatus")); + "Replicate_Command_Status")); assertTrue(logCapturer.getOutput().contains("type: " + "closeContainerCommand")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java index 6a0b909..f3cd4ea 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java @@ -127,7 +127,7 @@ public class TestSCMDatanodeHeartbeatDispatcher { SCMHeartbeatRequestProto.newBuilder() .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) .setContainerReport(containerReport) - .setCommandStatusReport(commandStatusReport) + .addCommandStatusReports(commandStatusReport) .build(); dispatcher.dispatch(heartbeat); Assert.assertEquals(2, eventReceived.get()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7ff8c05/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index fe53bcc..ac3ad5d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.junit.Assert.fail; @@ -188,6 +189,8 @@ public class TestStorageContainerManager { OzoneConfiguration conf = new OzoneConfiguration(); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 100, + TimeUnit.MILLISECONDS); conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 3000, TimeUnit.MILLISECONDS); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org