From common-commits-return-84277-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Thu Jun 14 18:50:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 02B5618078A for ; Thu, 14 Jun 2018 18:50:06 +0200 (CEST) Received: (qmail 85182 invoked by uid 500); 14 Jun 2018 16:50:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82558 invoked by uid 99); 14 Jun 2018 16:49:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Jun 2018 16:49:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C021AE11A4; Thu, 14 Jun 2018 16:49:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Thu, 14 Jun 2018 16:50:43 -0000 Message-Id: <7b5fcf29368b4367a893e52d92418092@git.apache.org> In-Reply-To: <55dfe12c360540528ca28555598121ad@git.apache.org> References: <55dfe12c360540528ca28555598121ad@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] hadoop git commit: HDDS-161. Add functionality to queue ContainerClose command from SCM Heartbeat Response to Ratis. Contributed by Shashikant Banerjee. HDDS-161. Add functionality to queue ContainerClose command from SCM Heartbeat Response to Ratis. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7547740e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7547740e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7547740e Branch: refs/heads/HDDS-48 Commit: 7547740e5c65edaa6c6f8aa1c8debabbdfb0945e Parents: 2299488 Author: Anu Engineer Authored: Wed Jun 13 17:50:42 2018 -0700 Committer: Anu Engineer Committed: Wed Jun 13 18:48:59 2018 -0700 ---------------------------------------------------------------------- .../statemachine/DatanodeStateMachine.java | 9 + .../CloseContainerCommandHandler.java | 21 +- .../commandhandler/CommandDispatcher.java | 4 + .../common/transport/server/XceiverServer.java | 7 + .../transport/server/XceiverServerGrpc.java | 9 + .../transport/server/XceiverServerSpi.java | 7 + .../server/ratis/XceiverServerRatis.java | 56 ++++- .../container/ozoneimpl/OzoneContainer.java | 62 +++++- .../commands/CloseContainerCommand.java | 12 +- .../StorageContainerDatanodeProtocol.proto | 1 + .../container/CloseContainerEventHandler.java | 3 +- .../scm/container/closer/ContainerCloser.java | 7 +- .../TestCloseContainerByPipeline.java | 221 +++++++++++++++++++ .../TestCloseContainerHandler.java | 7 +- 14 files changed, 412 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index cb4319d..dc4e673 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -403,4 +403,13 @@ public class DatanodeStateMachine implements Closeable { public long getCommandHandled() { return commandsHandled; } + + /** + * returns the Command Dispatcher. + * @return CommandDispatcher + */ + @VisibleForTesting + public CommandDispatcher getCommandDispatcher() { + return commandDispatcher; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index e8c602d..45f2bbd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -29,6 +31,8 @@ import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; + /** * Handler for close container command received from SCM. */ @@ -67,8 +71,23 @@ public class CloseContainerCommandHandler implements CommandHandler { CloseContainerCommandProto .parseFrom(command.getProtoBufMessage()); containerID = closeContainerProto.getContainerID(); + HddsProtos.ReplicationType replicationType = + closeContainerProto.getReplicationType(); + + ContainerProtos.CloseContainerRequestProto.Builder closeRequest = + ContainerProtos.CloseContainerRequestProto.newBuilder(); + closeRequest.setContainerID(containerID); - container.getContainerManager().closeContainer(containerID); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setCloseContainer(closeRequest); + request.setTraceID(UUID.randomUUID().toString()); + request.setDatanodeUuid( + context.getParent().getDatanodeDetails().getUuidString()); + // submit the close container request for the XceiverServer to handle + container.submitContainerRequest( + request.build(), replicationType); } catch (Exception e) { LOG.error("Can't close container " + containerID, e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index aedd78f..5163d98 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -77,6 +77,10 @@ public final class CommandDispatcher { } } + public CommandHandler getCloseContainerHandler() { + return handlerMap.get(Type.closeContainerCommand); + } + /** * Dispatch the command to the correct handler. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 455df49..3a469de 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.transport.server; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap; import org.apache.ratis.shaded.io.netty.channel.Channel; import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup; @@ -129,4 +130,10 @@ public final class XceiverServer implements XceiverServerSpi { channel.close().awaitUninterruptibly(); } } + + @Override + public void submitRequest( + ContainerProtos.ContainerCommandRequestProto request) throws IOException { + storageContainer.dispatch(request); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 550fe41..0a9e1db 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -44,6 +45,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { LOG = LoggerFactory.getLogger(XceiverServerGrpc.class); private int port; private Server server; + private final ContainerDispatcher storageContainer; /** * Constructs a Grpc server class. @@ -77,6 +79,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) .addService(new GrpcXceiverService(dispatcher)) .build(); + storageContainer = dispatcher; } @Override @@ -103,4 +106,10 @@ public final class XceiverServerGrpc implements XceiverServerSpi { public void stop() { server.shutdown(); } + + @Override + public void submitRequest( + ContainerProtos.ContainerCommandRequestProto request) throws IOException { + storageContainer.dispatch(request); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index dad9e9f..49579f2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.IOException; @@ -40,4 +41,10 @@ public interface XceiverServerSpi { */ HddsProtos.ReplicationType getServerType(); + /** + * submits a containerRequest to be performed by the replication pipeline. + * @param request ContainerCommandRequest + */ + void submitRequest(ContainerProtos.ContainerCommandRequestProto request) + throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 33c25ea..b9c7cae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -18,10 +18,12 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -33,10 +35,12 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -49,8 +53,10 @@ import java.net.ServerSocket; import java.net.SocketAddress; import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Creates a ratis server endpoint that acts as the communication layer for @@ -58,6 +64,12 @@ import java.util.concurrent.TimeUnit; */ public final class XceiverServerRatis implements XceiverServerSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); + private static final AtomicLong callIdCounter = new AtomicLong(); + + private static long nextCallId() { + return callIdCounter.getAndIncrement() & Long.MAX_VALUE; + } + private final int port; private final RaftServer server; private ThreadPoolExecutor writeChunkExecutor; @@ -241,4 +253,46 @@ public final class XceiverServerRatis implements XceiverServerSpi { public HddsProtos.ReplicationType getServerType() { return HddsProtos.ReplicationType.RATIS; } -} + + @VisibleForTesting + public RaftServer getServer() { + return server; + } + + private void processReply(RaftClientReply reply) { + + // NotLeader exception is thrown only when the raft server to which the + // request is submitted is not the leader. The request will be rejected + // and will eventually be executed once the request comnes via the leader + // node. + NotLeaderException notLeaderException = reply.getNotLeaderException(); + if (notLeaderException != null) { + LOG.info(reply.getNotLeaderException().getLocalizedMessage()); + } + StateMachineException stateMachineException = + reply.getStateMachineException(); + if (stateMachineException != null) { + // In case the request could not be completed, StateMachine Exception + // will be thrown. For now, Just log the message. + // If the container could not be closed, SCM will come to know + // via containerReports. CloseContainer should be re tried via SCM. + LOG.error(stateMachineException.getLocalizedMessage()); + } + } + + @Override + public void submitRequest( + ContainerProtos.ContainerCommandRequestProto request) throws IOException { + ClientId clientId = ClientId.randomId(); + RaftClientRequest raftClientRequest = + new RaftClientRequest(clientId, server.getId(), + RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0, + Message.valueOf(request.toByteString()), RaftClientRequest + // ReplicationLevel.ALL ensures the transactions corresponding to + // the request here are applied on all the raft servers. + .writeRequestType(RaftProtos.ReplicationLevel.ALL)); + CompletableFuture reply = + server.submitClientRequestAsync(raftClientRequest); + reply.thenAccept(this::processReply); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index b357fef..4156f5a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -18,7 +18,9 @@ package org.apache.hadoop.ozone.container.ozoneimpl; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -72,7 +74,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; * layer. */ public class OzoneContainer { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(OzoneContainer.class); private final Configuration ozoneConfig; @@ -269,9 +271,65 @@ public class OzoneContainer { return this.manager.getClosedContainerReports(); } + private XceiverServerSpi getRatisSerer() { + for (XceiverServerSpi serverInstance : server) { + if (serverInstance instanceof XceiverServerRatis) { + return serverInstance; + } + } + return null; + } + + private XceiverServerSpi getStandaAloneSerer() { + for (XceiverServerSpi serverInstance : server) { + if (!(serverInstance instanceof XceiverServerRatis)) { + return serverInstance; + } + } + return null; + } + @VisibleForTesting public ContainerManager getContainerManager() { return this.manager; } -} + public void submitContainerRequest( + ContainerProtos.ContainerCommandRequestProto request, + HddsProtos.ReplicationType replicationType) throws IOException { + XceiverServerSpi serverInstance; + long containerId = getContainerIdForCmd(request); + if (replicationType == HddsProtos.ReplicationType.RATIS) { + serverInstance = getRatisSerer(); + Preconditions.checkNotNull(serverInstance); + serverInstance.submitRequest(request); + LOG.info("submitting {} request over RATIS server for container {}", + request.getCmdType(), containerId); + } else { + serverInstance = getStandaAloneSerer(); + Preconditions.checkNotNull(serverInstance); + getStandaAloneSerer().submitRequest(request); + LOG.info( + "submitting {} request over STAND_ALONE server for container {}", + request.getCmdType(), containerId); + } + + } + + private long getContainerIdForCmd( + ContainerProtos.ContainerCommandRequestProto request) + throws IllegalArgumentException { + ContainerProtos.Type type = request.getCmdType(); + switch (type) { + case CloseContainer: + return request.getCloseContainer().getContainerID(); + // Right now, we handle only closeContainer via queuing it over the + // over the XceiVerServer. For all other commands we throw Illegal + // argument exception here. Will need to extend the switch cases + // in case we want add another commands here. + default: + throw new IllegalArgumentException("Cmd " + request.getCmdType() + + " not supported over HearBeat Response"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index 4f4f82b..c7d8df5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.protocol.proto @@ -31,9 +32,12 @@ public class CloseContainerCommand extends SCMCommand { private long containerID; + private HddsProtos.ReplicationType replicationType; - public CloseContainerCommand(long containerID) { + public CloseContainerCommand(long containerID, + HddsProtos.ReplicationType replicationType) { this.containerID = containerID; + this.replicationType = replicationType; } /** @@ -58,13 +62,15 @@ public class CloseContainerCommand public CloseContainerCommandProto getProto() { return CloseContainerCommandProto.newBuilder() - .setContainerID(containerID).build(); + .setContainerID(containerID) + .setReplicationType(replicationType).build(); } public static CloseContainerCommand getFromProtobuf( CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); - return new CloseContainerCommand(closeContainerProto.getContainerID()); + return new CloseContainerCommand(closeContainerProto.getContainerID(), + closeContainerProto.getReplicationType()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 95e210e..f6aba05 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -223,6 +223,7 @@ This command asks the datanode to close a specific container. */ message CloseContainerCommandProto { required int64 containerID = 1; + required hadoop.hdds.ReplicationType replicationType = 2; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index bc95b55..16e84a3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -63,7 +63,8 @@ public class CloseContainerEventHandler implements EventHandler { if (info.getState() == HddsProtos.LifeCycleState.OPEN) { for (DatanodeDetails datanode : info.getPipeline().getMachines()) { containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(), - new CloseContainerCommand(containerID.getId())); + new CloseContainerCommand(containerID.getId(), + info.getPipeline().getType())); } try { // Finalize event will make sure the state of the container transitions http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java index 75ec8e1..937076c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java @@ -127,11 +127,12 @@ public class ContainerCloser { // to SCM. In that case also, data node will ignore this command. HddsProtos.Pipeline pipeline = info.getPipeline(); - for (HddsProtos.DatanodeDetailsProto datanodeDetails : - pipeline.getPipelineChannel().getMembersList()) { + for (HddsProtos.DatanodeDetailsProto datanodeDetails : pipeline + .getPipelineChannel().getMembersList()) { nodeManager.addDatanodeCommand( DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(), - new CloseContainerCommand(info.getContainerID())); + new CloseContainerCommand(info.getContainerID(), + pipeline.getPipelineChannel().getType())); } if (!commandIssued.containsKey(info.getContainerID())) { commandIssued.put(info.getContainerID(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java new file mode 100644 index 0000000..9e8cb46 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeoutException; + +public class TestCloseContainerByPipeline { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true and + * OZONE_HANDLER_TYPE_KEY = "distributed" + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, + OzoneConsts.OZONE_HANDLER_DISTRIBUTED); + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3).build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + objectStore.createVolume("test"); + objectStore.getVolume("test").createBucket("test"); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testCloseContainerViaStandaAlone() + throws IOException, TimeoutException, InterruptedException { + + OzoneOutputStream key = objectStore.getVolume("test").getBucket("test") + .createKey("standalone", 1024, ReplicationType.STAND_ALONE, + ReplicationFactor.ONE); + key.write("standalone".getBytes()); + key.close(); + + //get the name of a valid container + KsmKeyArgs keyArgs = + new KsmKeyArgs.Builder().setVolumeName("test").setBucketName("test") + .setType(HddsProtos.ReplicationType.STAND_ALONE) + .setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024) + .setKeyName("standalone").build(); + + KsmKeyLocationInfo ksmKeyLocationInfo = + cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions() + .get(0).getBlocksLatestVersionOnly().get(0); + + long containerID = ksmKeyLocationInfo.getContainerID(); + List datanodes = + cluster.getStorageContainerManager().getContainerInfo(containerID) + .getPipeline().getMachines(); + Assert.assertTrue(datanodes.size() == 1); + + DatanodeDetails datanodeDetails = datanodes.get(0); + Assert + .assertFalse(isContainerClosed(cluster, containerID, datanodeDetails)); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG); + //send the order to close the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(datanodeDetails.getUuid(), + new CloseContainerCommand(containerID, + HddsProtos.ReplicationType.STAND_ALONE)); + + GenericTestUtils + .waitFor(() -> isContainerClosed(cluster, containerID, datanodeDetails), + 500, 5 * 1000); + + //double check if it's really closed (waitFor also throws an exception) + Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); + Assert.assertTrue(logCapturer.getOutput().contains( + "submitting CloseContainer request over STAND_ALONE server for" + + " container " + containerID)); + // Make sure it was really closed via StandAlone not Ratis server + Assert.assertFalse((logCapturer.getOutput().contains( + "submitting CloseContainer request over RATIS server for container " + + containerID))); + } + + @Test + public void testCloseContainerViaRatis() throws IOException, + TimeoutException, InterruptedException { + + OzoneOutputStream key = objectStore.getVolume("test").getBucket("test") + .createKey("ratis", 1024, ReplicationType.RATIS, + ReplicationFactor.THREE); + key.write("ratis".getBytes()); + key.close(); + + //get the name of a valid container + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder().setVolumeName("test"). + setBucketName("test").setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setDataSize(1024) + .setKeyName("ratis").build(); + + KsmKeyLocationInfo ksmKeyLocationInfo = + cluster.getKeySpaceManager().lookupKey(keyArgs).getKeyLocationVersions() + .get(0).getBlocksLatestVersionOnly().get(0); + + long containerID = ksmKeyLocationInfo.getContainerID(); + List datanodes = + cluster.getStorageContainerManager().getContainerInfo(containerID) + .getPipeline().getMachines(); + Assert.assertTrue(datanodes.size() == 3); + + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(OzoneContainer.LOG); + + for (DatanodeDetails details : datanodes) { + Assert.assertFalse(isContainerClosed(cluster, containerID, details)); + //send the order to close the container + cluster.getStorageContainerManager().getScmNodeManager() + .addDatanodeCommand(details.getUuid(), + new CloseContainerCommand(containerID, + HddsProtos.ReplicationType.RATIS)); + } + + for (DatanodeDetails datanodeDetails : datanodes) { + GenericTestUtils.waitFor( + () -> isContainerClosed(cluster, containerID, datanodeDetails), 500, + 5 * 1000); + //double check if it's really closed (waitFor also throws an exception) + Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); + } + Assert.assertFalse(logCapturer.getOutput().contains( + "submitting CloseContainer request over STAND_ALONE " + + "server for container " + containerID)); + // Make sure it was really closed via StandAlone not Ratis server + Assert.assertTrue((logCapturer.getOutput().contains( + "submitting CloseContainer request over RATIS server for container " + + containerID))); + } + + private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID, + DatanodeDetails datanode) { + ContainerData containerData; + try { + for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) + if (datanode.equals(datanodeService.getDatanodeDetails())) { + containerData = + datanodeService.getDatanodeStateMachine().getContainer() + .getContainerManager().readContainer(containerID); + if (!containerData.isOpen()) { + // make sure the closeContainerHandler on the Datanode is invoked + Assert.assertTrue( + datanodeService.getDatanodeStateMachine().getCommandDispatcher() + .getCloseContainerHandler().getInvocationCount() > 0); + return true; + } + } + } catch (StorageContainerException e) { + throw new AssertionError(e); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7547740e/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java index fbe43d7..efb7344 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java @@ -83,12 +83,13 @@ public class TestCloseContainerHandler { Assert.assertFalse(isContainerClosed(cluster, containerID)); - DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0) - .getDatanodeDetails(); + DatanodeDetails datanodeDetails = + cluster.getHddsDatanodes().get(0).getDatanodeDetails(); //send the order to close the container cluster.getStorageContainerManager().getScmNodeManager() .addDatanodeCommand(datanodeDetails.getUuid(), - new CloseContainerCommand(containerID)); + new CloseContainerCommand(containerID, + HddsProtos.ReplicationType.STAND_ALONE)); GenericTestUtils.waitFor(() -> isContainerClosed(cluster, containerID), 500, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org