Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0CA11200C34 for ; Mon, 27 Feb 2017 21:35:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0B355160B5B; Mon, 27 Feb 2017 20:35:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D752E160B60 for ; Mon, 27 Feb 2017 21:35:13 +0100 (CET) Received: (qmail 22780 invoked by uid 500); 27 Feb 2017 20:35:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 22573 invoked by uid 99); 27 Feb 2017 20:35:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Feb 2017 20:35:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB2C1DFDC2; Mon, 27 Feb 2017 20:35:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Mon, 27 Feb 2017 20:35:13 -0000 Message-Id: <0bf2b04abfe2459a9eb3cb100e18a47c@git.apache.org> In-Reply-To: <157f4f467c6e42778644d74a24d4fbee@git.apache.org> References: <157f4f467c6e42778644d74a24d4fbee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: HDFS-11184. Ozone: SCM: Make SCM use container protocol. Contributed by Anu Engineer. archived-at: Mon, 27 Feb 2017 20:35:16 -0000 HDFS-11184. Ozone: SCM: Make SCM use container protocol. Contributed by Anu Engineer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d63ec0ca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d63ec0ca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d63ec0ca Branch: refs/heads/HDFS-7240 Commit: d63ec0ca81f34efb0adee7af9cd6e5d5c2d3cc49 Parents: ae783b1 Author: Anu Engineer Authored: Mon Feb 27 12:25:03 2017 -0800 Committer: Anu Engineer Committed: Mon Feb 27 12:25:03 2017 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hdfs/protocol/DatanodeID.java | 12 +- .../scm/container/common/helpers/Pipeline.java | 1 - .../scm/storage/ContainerProtocolCalls.java | 60 +- .../hdfs/server/datanode/BlockPoolManager.java | 13 +- .../apache/hadoop/ozone/OzoneClientUtils.java | 6 +- .../apache/hadoop/ozone/OzoneConfigKeys.java | 8 +- .../statemachine/DatanodeStateMachine.java | 21 +- .../common/statemachine/StateContext.java | 8 + .../states/datanode/RunningDatanodeState.java | 33 +- .../states/endpoint/VersionEndpointTask.java | 2 +- .../StorageContainerDatanodeProtocol.java | 8 +- .../ozone/protocol/commands/NullCommand.java | 4 +- .../protocol/commands/RegisteredCommand.java | 6 +- .../ozone/protocol/commands/SCMCommand.java | 4 +- ...rDatanodeProtocolClientSideTranslatorPB.java | 5 +- ...rDatanodeProtocolServerSideTranslatorPB.java | 2 +- ...rLocationProtocolServerSideTranslatorPB.java | 41 +- .../ozone/scm/StorageContainerManager.java | 442 +++++++++++++++ .../ozone/scm/container/ContainerMapping.java | 5 +- .../hadoop/ozone/scm/node/NodeManager.java | 6 +- .../apache/hadoop/ozone/scm/package-info.java | 12 +- .../ozone/storage/StorageContainerManager.java | 563 ------------------- .../storage/StorageContainerNameService.java | 137 ----- .../hadoop/ozone/storage/package-info.java | 23 - .../apache/hadoop/ozone/MiniOzoneCluster.java | 346 +++++++++--- .../ozone/TestStorageContainerManager.java | 24 +- .../ozone/container/common/ScmTestMock.java | 8 +- .../common/TestDatanodeStateMachine.java | 7 +- .../ozone/container/common/TestEndPoint.java | 2 +- .../common/impl/TestContainerPersistence.java | 6 +- .../container/ozoneimpl/TestOzoneContainer.java | 6 +- .../hadoop/ozone/scm/TestAllocateContainer.java | 93 +++ .../ozone/scm/container/MockNodeManager.java | 47 +- .../apache/hadoop/ozone/scm/package-info.java | 21 + .../ozone/web/TestOzoneRestWithMiniCluster.java | 6 +- 35 files changed, 1045 insertions(+), 943 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java index 6f5f1e3..a7c2d06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -301,10 +301,10 @@ public class DatanodeID implements Comparable { * @param datanodeIDProto - protoBuf Message * @return DataNodeID */ - public static DatanodeID getFromProtoBuf(HdfsProtos.DatanodeIDProto - datanodeIDProto) { - DatanodeID id = new DatanodeID(datanodeIDProto.getDatanodeUuid(), - datanodeIDProto.getIpAddr(), datanodeIDProto.getHostName(), + public static DatanodeID getFromProtoBuf( + HdfsProtos.DatanodeIDProto datanodeIDProto) { + DatanodeID id = new DatanodeID(datanodeIDProto.getIpAddr(), + datanodeIDProto.getHostName(), datanodeIDProto.getDatanodeUuid(), datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(), datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort()); id.setContainerPort(datanodeIDProto.getContainerPort()); @@ -315,7 +315,7 @@ public class DatanodeID implements Comparable { * Returns a DataNodeID protobuf message from a datanode ID. * @return HdfsProtos.DatanodeIDProto */ - public HdfsProtos.DatanodeIDProto getProtoBufMessage() { + public HdfsProtos.DatanodeIDProto getProtoBufMessage() { HdfsProtos.DatanodeIDProto.Builder builder = HdfsProtos.DatanodeIDProto.newBuilder(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java index fe10ca2..b4c3bd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java @@ -63,7 +63,6 @@ public class Pipeline { return newPipeline; } - /** Adds a member to pipeline */ /** * Adds a member to the pipeline. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index ee6348c..7fc1fc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -27,19 +27,30 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .GetKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .GetKeyResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .PutKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ReadChunkRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ReadChunkResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutSmallFileRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .WriteChunkRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .PutSmallFileRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .GetSmallFileResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .GetSmallFileRequestProto; import org.apache.hadoop.scm.XceiverClient; /** @@ -211,6 +222,33 @@ public final class ContainerProtocolCalls { } /** + * createContainer call that creates a container on the datanode. + * @param client - client + * @param traceID - traceID + * @throws IOException + */ + public static void createContainer(XceiverClient client, String traceID) + throws IOException { + ContainerProtos.CreateContainerRequestProto.Builder createRequest = + ContainerProtos.CreateContainerRequestProto + .newBuilder(); + ContainerProtos.ContainerData.Builder containerData = ContainerProtos + .ContainerData.newBuilder(); + containerData.setName(client.getPipeline().getContainerName()); + createRequest.setPipeline(client.getPipeline().getProtobufMessage()); + createRequest.setContainerData(containerData.build()); + + ContainerCommandRequestProto.Builder request = + ContainerCommandRequestProto.newBuilder(); + request.setCmdType(ContainerProtos.Type.CreateContainer); + request.setCreateContainer(createRequest); + request.setTraceID(traceID); + ContainerCommandResponseProto response = client.sendCommand( + request.build()); + validateContainerResponse(response, traceID); + } + + /** * Reads the data given the container name and key. * * @param client - client http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index 8202f73..497f4ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Joiner; @@ -147,8 +146,8 @@ class BlockPoolManager { void refreshNamenodes(Configuration conf) throws IOException { - LOG.info("Refresh request received for nameservices: " + conf.get - (DFSConfigKeys.DFS_NAMESERVICES)); + LOG.info("Refresh request received for nameservices: " + + conf.get(DFSConfigKeys.DFS_NAMESERVICES)); Map> newAddressMap = new HashMap<>(); @@ -165,14 +164,6 @@ class BlockPoolManager { "This may be an Ozone-only cluster."); } - if (dn.isOzoneEnabled()) { - newAddressMap.putAll(OzoneClientUtils.getScmServiceRpcAddresses(conf)); - - // SCM does not have a lifeline service port (yet). - newLifelineAddressMap.putAll( - OzoneClientUtils.getScmServiceRpcAddresses(conf)); - } - if (newAddressMap.isEmpty()) { throw new IOException("No services to connect (NameNodes or SCM)."); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index 5d1aed8..46b1d66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -321,9 +321,9 @@ public final class OzoneClientUtils { * @param conf - Ozone Config * @return - HB interval in seconds. */ - public static int getScmHeartbeatInterval(Configuration conf) { - return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, - OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT); + public static long getScmHeartbeatInterval(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, + OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT, TimeUnit.SECONDS); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 746fefe..a758db5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -94,11 +94,6 @@ public final class OzoneConfigKeys { public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT = OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L; - public static final String OZONE_SCM_CONTAINER_THREADS = - "ozone.scm.container.threads"; - public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT = - Runtime.getRuntime().availableProcessors() * 2; - public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = "ozone.scm.heartbeat.rpc-timeout"; public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = @@ -142,6 +137,9 @@ public final class OzoneConfigKeys { public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id"; + public static final String OZONE_SCM_DB_CACHE_SIZE_MB = + "ozone.scm.db.cache.size.mb"; + public static final int OZONE_SCM_DB_CACHE_SIZE_DEFAULT = 128; /** * There is no need to instantiate this class. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 8063e23..db83734 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneClientUtils; -import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -29,10 +28,8 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * State Machine Class. @@ -55,14 +52,13 @@ public class DatanodeStateMachine implements Closeable { */ public DatanodeStateMachine(Configuration conf) throws IOException { this.conf = conf; - executorService = HadoopExecutors.newScheduledThreadPool( - this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, - OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), - new ThreadFactoryBuilder().setDaemon(true) + executorService = HadoopExecutors.newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Datanode State Machine Thread - %d").build()); connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); - heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf); + heartbeatFrequency = TimeUnit.SECONDS.toMillis( + OzoneClientUtils.getScmHeartbeatInterval(conf)); container = new OzoneContainer(conf); } @@ -84,6 +80,7 @@ public class DatanodeStateMachine implements Closeable { container.start(); while (context.getState() != DatanodeStates.SHUTDOWN) { try { + LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB = Time.monotonicNow() + heartbeatFrequency; context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); @@ -91,8 +88,8 @@ public class DatanodeStateMachine implements Closeable { if (now < nextHB) { Thread.sleep(nextHB - now); } - } catch (InterruptedException | ExecutionException | TimeoutException e) { - LOG.error("Unable to finish the execution", e); + } catch (Exception e) { + LOG.error("Unable to finish the execution.", e); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 0a20945..e397202 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -187,5 +187,13 @@ public class StateContext { } } + /** + * Returns the count of the Execution. + * @return long + */ + public long getExecutionCount() { + return stateExecutionCount.get(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 69eabe6..77b4138 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -22,22 +22,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.container.common.statemachine - .DatanodeStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .EndpointStateMachine; -import org.apache.hadoop.ozone.container.common.statemachine - .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.states.DatanodeState; -import org.apache.hadoop.ozone.container.common.states.endpoint - .HeartbeatEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint - .RegisterEndpointTask; -import org.apache.hadoop.ozone.container.common.states.endpoint - .VersionEndpointTask; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +102,7 @@ public class RunningDatanodeState implements DatanodeState { DatanodeID temp = new DatanodeID( //TODO : Replace this with proper network and kerberos // support code. - InetAddress.getLocalHost().getHostAddress().toString(), + InetAddress.getLocalHost().getHostAddress(), DataNode.getHostName(conf), UUID.randomUUID().toString(), 0, /** XferPort - SCM does not use this port */ @@ -134,6 +127,13 @@ public class RunningDatanodeState implements DatanodeState { private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto createNewContainerID(Path idPath) throws IOException { + + if(!idPath.getParent().toFile().exists() && + !idPath.getParent().toFile().mkdirs()) { + LOG.error("Failed to create container ID locations. Path: {}", + idPath.getParent()); + throw new IOException("Unable to create container ID directories."); + } StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto containerIDProto = StorageContainerDatanodeProtocolProtos .ContainerNodeIDProto.newBuilder() @@ -213,7 +213,8 @@ public class RunningDatanodeState implements DatanodeState { ecs.submit(endpointTask); } } - + //TODO : Cache some of these tasks instead of creating them + //all the time. private Callable getEndPointTask(EndpointStateMachine endpoint) { switch (endpoint.getState()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java index 1dfc432..fa59234 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -49,7 +49,7 @@ public class VersionEndpointTask implements rpcEndPoint.lock(); try{ SCMVersionResponseProto versionResponse = - rpcEndPoint.getEndPoint().getVersion(); + rpcEndPoint.getEndPoint().getVersion(null); rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse)); EndpointStateMachine.EndPointStates nextState = http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 86ca946..6a9dc67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -5,9 +5,9 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; @@ -34,7 +35,8 @@ public interface StorageContainerDatanodeProtocol { * Returns SCM version. * @return Version info. */ - SCMVersionResponseProto getVersion() throws IOException; + SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest) + throws IOException; /** * Used by data node to send a Heartbeat. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java index 4bdf422..7ae1117 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java @@ -44,8 +44,8 @@ public class NullCommand extends SCMCommand { * @return A protobuf message. */ @Override - public NullCmdResponseProto getProtoBufMessage() { - return NullCmdResponseProto.newBuilder().build(); + public byte[] getProtoBufMessage() { + return NullCmdResponseProto.newBuilder().build().toByteArray(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index f2944ce..bf430ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -57,7 +57,7 @@ public class RegisteredCommand extends * @return Type */ @Override - Type getType() { + public Type getType() { return Type.registeredCommand; } @@ -94,12 +94,12 @@ public class RegisteredCommand extends * @return A protobuf message. */ @Override - SCMRegisteredCmdResponseProto getProtoBufMessage() { + public byte[] getProtoBufMessage() { return SCMRegisteredCmdResponseProto.newBuilder() .setClusterID(this.clusterID) .setDatanodeUUID(this.datanodeUUID) .setErrorCode(this.error) - .build(); + .build().toByteArray(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index a6acf4e..fe9b12d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -31,11 +31,11 @@ public abstract class SCMCommand { * Returns the type of this command. * @return Type */ - abstract Type getType(); + public abstract Type getType(); /** * Gets the protobuf message of this object. * @return A protobuf message. */ - abstract T getProtoBufMessage(); + public abstract byte[] getProtoBufMessage(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index e71684c3..ba40c29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -92,11 +92,12 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB /** * Returns SCM version. * + * @param unused - set to null and unused. * @return Version info. */ @Override - public SCMVersionResponseProto getVersion() throws IOException { - + public SCMVersionResponseProto getVersion(SCMVersionRequestProto + unused) throws IOException { SCMVersionRequestProto request = SCMVersionRequestProto.newBuilder().build(); final SCMVersionResponseProto response; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 70ad414..62b885a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -47,7 +47,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) throws ServiceException { try { - return impl.getVersion(); + return impl.getVersion(request); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 19eb8a5..6590112 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -31,10 +31,18 @@ import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos + .GetStorageContainerLocationsRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos + .GetStorageContainerLocationsResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.LocatedContainerProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos.ContainerResponseProto; + +import org.apache.hadoop.scm.container.common.helpers.Pipeline; /** * This class is the server-side translator that forwards requests received on @@ -63,7 +71,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB throws ServiceException { Set keys = Sets.newLinkedHashSetWithExpectedSize( req.getKeysCount()); - for (String key: req.getKeysList()) { + for (String key : req.getKeysList()) { keys.add(key); } final Set locatedContainers; @@ -74,13 +82,13 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB } GetStorageContainerLocationsResponseProto.Builder resp = GetStorageContainerLocationsResponseProto.newBuilder(); - for (LocatedContainer locatedContainer: locatedContainers) { + for (LocatedContainer locatedContainer : locatedContainers) { LocatedContainerProto.Builder locatedContainerProto = LocatedContainerProto.newBuilder() - .setKey(locatedContainer.getKey()) - .setMatchedKeyPrefix(locatedContainer.getMatchedKeyPrefix()) - .setContainerName(locatedContainer.getContainerName()); - for (DatanodeInfo location: locatedContainer.getLocations()) { + .setKey(locatedContainer.getKey()) + .setMatchedKeyPrefix(locatedContainer.getMatchedKeyPrefix()) + .setContainerName(locatedContainer.getContainerName()); + for (DatanodeInfo location : locatedContainer.getLocations()) { locatedContainerProto.addLocations(PBHelperClient.convert(location)); } locatedContainerProto.setLeader( @@ -94,6 +102,15 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB public ContainerResponseProto allocateContainer(RpcController unused, StorageContainerLocationProtocolProtos.ContainerRequestProto request) throws ServiceException { - return null; + try { + Pipeline pipeline = impl.allocateContainer(request.getContainerName()); + return ContainerResponseProto.newBuilder() + .setPipeline(pipeline.getProtobufMessage()) + .setErrorCode(ContainerResponseProto.Error.success) + .build(); + + } catch (IOException e) { + throw new ServiceException(e); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java new file mode 100644 index 0000000..0a6f35f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.commands.NullCommand; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.NullCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.scm.container.ContainerMapping; +import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; + +/** + * StorageContainerManager is the main entry point for the service that provides + * information about which SCM nodes host containers. + * + * DataNodes report to StorageContainerManager using heartbeat + * messages. SCM allocates containers and returns a pipeline. + * + * A client once it gets a pipeline (a list of datanodes) will connect to the + * datanodes and create a container, which then can be used to store data. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) +public class StorageContainerManager + implements StorageContainerDatanodeProtocol, + StorageContainerLocationProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(StorageContainerManager.class); + + /** + * NodeManager and container Managers for SCM. + */ + private final NodeManager scmNodeManager; + private final Mapping scmContainerManager; + + /** The RPC server that listens to requests from DataNodes. */ + private final RPC.Server datanodeRpcServer; + private final InetSocketAddress datanodeRpcAddress; + + /** The RPC server that listens to requests from clients. */ + private final RPC.Server clientRpcServer; + private final InetSocketAddress clientRpcAddress; + + /** + * Creates a new StorageContainerManager. Configuration will be updated with + * information on the actual listening addresses used for RPC servers. + * + * @param conf configuration + */ + public StorageContainerManager(OzoneConfiguration conf) + throws IOException { + + final int handlerCount = conf.getInt( + OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + + // TODO : Fix the ClusterID generation code. + scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString()); + scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); + + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos. + StorageContainerDatanodeProtocolService.newReflectiveBlockingService( + new StorageContainerDatanodeProtocolServerSideTranslatorPB(this)); + + final InetSocketAddress datanodeRpcAddr = + OzoneClientUtils.getScmDataNodeBindAddress(conf); + datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr, + StorageContainerDatanodeProtocolPB.class, dnProtoPbService, + handlerCount); + datanodeRpcAddress = updateListenAddress(conf, + OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); + + + BlockingService storageProtoPbService = + StorageContainerLocationProtocolProtos + .StorageContainerLocationProtocolService + .newReflectiveBlockingService( + new StorageContainerLocationProtocolServerSideTranslatorPB( + this)); + + final InetSocketAddress scmAddress = + OzoneClientUtils.getScmClientBindAddress(conf); + clientRpcServer = startRpcServer(conf, scmAddress, + StorageContainerLocationProtocolPB.class, storageProtoPbService, + handlerCount); + clientRpcAddress = updateListenAddress(conf, + OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer); + } + + /** + * Builds a message for logging startup information about an RPC server. + * + * @param description RPC server description + * @param addr RPC server listening address + * @return server startup message + */ + private static String buildRpcServerStartMessage(String description, + InetSocketAddress addr) { + return addr != null ? String.format("%s is listening at %s", + description, addr.toString()) : + String.format("%s not started", description); + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(OzoneConfiguration conf, + InetSocketAddress addr, Class protocol, BlockingService instance, + int handlerCount) + throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + /** + * After starting an RPC server, updates configuration with the actual + * listening address of that server. The listening address may be different + * from the configured address if, for example, the configured address uses + * port 0 to request use of an ephemeral port. + * + * @param conf configuration to update + * @param rpcAddressKey configuration key for RPC server address + * @param addr configured address + * @param rpcServer started RPC server. + */ + private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, + String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { + InetSocketAddress listenAddr = rpcServer.getListenerAddress(); + InetSocketAddress updatedAddr = new InetSocketAddress( + addr.getHostString(), listenAddr.getPort()); + conf.set(rpcAddressKey, + listenAddr.getHostString() + ":" + listenAddr.getPort()); + return updatedAddr; + } + + /** + * Main entry point for starting StorageContainerManager. + * + * @param argv arguments + * @throws IOException if startup fails due to I/O error + */ + public static void main(String[] argv) throws IOException { + StringUtils.startupShutdownMessage(StorageContainerManager.class, + argv, LOG); + try { + StorageContainerManager scm = new StorageContainerManager( + new OzoneConfiguration()); + scm.start(); + scm.join(); + } catch (Throwable t) { + LOG.error("Failed to start the StorageContainerManager.", t); + terminate(1, t); + } + } + + /** + * Returns a SCMCommandRepose from the SCM Command. + * @param cmd - Cmd + * @return SCMCommandResponseProto + * @throws InvalidProtocolBufferException + */ + @VisibleForTesting + public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd) + throws InvalidProtocolBufferException { + Type type = cmd.getType(); + switch (type) { + case nullCmd: + Preconditions.checkState(cmd.getClass() == NullCommand.class); + return SCMCommandResponseProto.newBuilder().setCmdType(cmd.getType()) + .setNullCommand( + NullCmdResponseProto.parseFrom(cmd.getProtoBufMessage())) + .build(); + default: + throw new IllegalArgumentException("Not implemented"); + } + } + + @VisibleForTesting + public static SCMRegisteredCmdResponseProto getRegisteredResponse( + SCMCommand cmd, SCMNodeAddressList addressList) { + Preconditions.checkState(cmd.getClass() == RegisteredCommand.class); + RegisteredCommand rCmd = (RegisteredCommand) cmd; + StorageContainerDatanodeProtocolProtos.Type type = cmd.getType(); + if (type != Type.registeredCommand) { + throw new IllegalArgumentException("Registered command is not well " + + "formed. Internal Error."); + } + return SCMRegisteredCmdResponseProto.newBuilder() + //TODO : Fix this later when we have multiple SCM support. + //.setAddressList(addressList) + .setErrorCode(rCmd.getError()) + .setClusterID(rCmd.getClusterID()) + .setDatanodeUUID(rCmd.getDatanodeUUID()).build(); + } + + // TODO : This code will move into KSM later. Write now this code is stubbed + // implementation that lets the ozone tests pass. + @Override + public Set getStorageContainerLocations(Set keys) + throws IOException { + throw new IOException("Not Implemented."); + } + + /** + * Asks SCM where a container should be allocated. SCM responds with the set + * of datanodes that should be used creating this container. + * + * @param containerName - Name of the container. + * @return Pipeline. + * @throws IOException + */ + @Override + public Pipeline allocateContainer(String containerName) throws IOException { + return scmContainerManager.allocateContainer(containerName); + } + + /** + * Returns listening address of StorageLocation Protocol RPC server. + * + * @return listen address of StorageLocation RPC server + */ + @VisibleForTesting + public InetSocketAddress getClientRpcAddress() { + return clientRpcAddress; + } + + /** + * Returns listening address of StorageDatanode Protocol RPC server. + * + * @return Address where datanode are communicating. + */ + public InetSocketAddress getDatanodeRpcAddress() { + return datanodeRpcAddress; + } + + /** + * Start service. + */ + public void start() { + LOG.info(buildRpcServerStartMessage( + "StorageContainerLocationProtocol RPC server", clientRpcAddress)); + clientRpcServer.start(); + LOG.info(buildRpcServerStartMessage("RPC server for DataNodes", + datanodeRpcAddress)); + datanodeRpcServer.start(); + } + + /** + * Stop service. + */ + public void stop() { + LOG.info("Stopping the StorageContainerLocationProtocol RPC server"); + clientRpcServer.stop(); + LOG.info("Stopping the RPC server for DataNodes"); + datanodeRpcServer.stop(); + } + + /** + * Wait until service has completed shutdown. + */ + public void join() { + try { + clientRpcServer.join(); + datanodeRpcServer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted during StorageContainerManager join."); + } + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public SCMVersionResponseProto getVersion( + SCMVersionRequestProto versionRequest) throws IOException { + return getScmNodeManager().getVersion(versionRequest).getProtobufMessage(); + } + + /** + * Used by data node to send a Heartbeat. + * + * @param datanodeID - Datanode ID. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + @Override + public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) + throws IOException { + List commands = getScmNodeManager().sendHeartbeat(datanodeID); + List cmdReponses = new LinkedList<>(); + for (SCMCommand cmd : commands) { + cmdReponses.add(getCommandResponse(cmd)); + } + return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdReponses) + .build(); + } + + /** + * Register Datanode. + * + * @param datanodeID - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + register(DatanodeID datanodeID, String[] scmAddresses) + throws IOException { + // TODO : Return the list of Nodes that forms the SCM HA. + return getRegisteredResponse(scmNodeManager.register(datanodeID), null); + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @param nodestate Healthy, Dead etc. + * @return int -- count + */ + public int getNodeCount(SCMNodeManager.NODESTATE nodestate) { + return scmNodeManager.getNodeCount(nodestate); + } + + /** + * Returns node manager. + * @return - Node Manager + */ + @VisibleForTesting + public NodeManager getScmNodeManager() { + return scmNodeManager; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index aa688aa..346cb88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -46,7 +46,7 @@ public class ContainerMapping implements Mapping { LoggerFactory.getLogger(ContainerMapping.class); private final NodeManager nodeManager; - private final int cacheSize; + private final long cacheSize; private final Lock lock; private final Charset encoding = Charset.forName("UTF-8"); private final LevelDBStore store; @@ -77,7 +77,7 @@ public class ContainerMapping implements Mapping { } File dbPath = new File(scmMetaDataDir, "SCM.db"); Options options = new Options(); - options.cacheSize(this.cacheSize * (1024 * 1024)); + options.cacheSize(this.cacheSize * (1024L * 1024L)); options.createIfMissing(); store = new LevelDBStore(dbPath, options); this.lock = new ReentrantLock(); @@ -85,6 +85,7 @@ public class ContainerMapping implements Mapping { } /** + * // TODO : Fix the code to handle multiple nodes. * Translates a list of nodes, ordered such that the first is the leader, into * a corresponding {@link Pipeline} object. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index ff693bc..47cb7a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm.node; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import java.io.Closeable; import java.util.List; @@ -45,9 +46,8 @@ import java.util.List; * DECOMMISSIONED - Someone told us to remove this node from the tracking * list, by calling removeNode. We will throw away this nodes info soon. */ -public interface NodeManager extends Closeable, Runnable { - - +public interface NodeManager extends StorageContainerNodeProtocol, Closeable, + Runnable { /** * Removes a data node from the management of this Node Manager. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java index 08bddb4..7686df3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java @@ -5,11 +5,11 @@ * licenses this file to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. @@ -17,6 +17,6 @@ package org.apache.hadoop.ozone.scm; -/** - * This package contains Storage Container Manager classes. +/* + * This package contains StorageContainerManager classes. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java deleted file mode 100644 index 1974b7a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java +++ /dev/null @@ -1,563 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.storage; - -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.protobuf.BlockingService; - - -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.hdfs.DFSUtil; - -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos; -import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; -import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; -import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; -import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; -import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; -import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; -import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.OzoneClientUtils; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.XceiverClient; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.ozone.protocol.LocatedContainer; -import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; -import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB; -import org.apache.hadoop.util.StringUtils; - -/** - * StorageContainerManager is the main entry point for the service that provides - * information about which HDFS nodes host containers. - * - * The current implementation is a stub suitable to begin end-to-end testing of - * Ozone service interactions. DataNodes report to StorageContainerManager - * using the existing heartbeat messages. StorageContainerManager lazily - * initializes a single storage container to be served by those DataNodes. - * All subsequent requests for container locations will reply with that single - * pipeline, using all registered nodes. - * - * This will evolve from a stub to a full-fledged implementation capable of - * partitioning the keyspace across multiple containers, with appropriate - * distribution across nodes. - */ -@InterfaceAudience.Private -public class StorageContainerManager - implements DatanodeProtocol, StorageContainerLocationProtocol { - - private static final Logger LOG = - LoggerFactory.getLogger(StorageContainerManager.class); - - private final StorageContainerNameService ns; - private final BlockManager blockManager; - private final XceiverClientManager xceiverClientManager; - private Pipeline singlePipeline; - - /** The RPC server that listens to requests from DataNodes. */ - private final RPC.Server datanodeRpcServer; - private final InetSocketAddress datanodeRpcAddress; - - /** The RPC server that listens to requests from clients. */ - private final RPC.Server clientRpcServer; - private final InetSocketAddress clientRpcAddress; - - /** - * Creates a new StorageContainerManager. Configuration will be updated with - * information on the actual listening addresses used for RPC servers. - * - * @param conf configuration - */ - public StorageContainerManager(OzoneConfiguration conf) - throws IOException { - ns = new StorageContainerNameService(); - boolean haEnabled = false; - blockManager = new BlockManager(ns, haEnabled, conf); - xceiverClientManager = new XceiverClientManager(conf); - - RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, - ProtobufRpcEngine.class); - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - - final int handlerCount = conf.getInt( - OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); - final int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH, - IPC_MAXIMUM_DATA_LENGTH_DEFAULT); - BlockingService dnProtoPbService = DatanodeProtocolProtos. - DatanodeProtocolService.newReflectiveBlockingService( - new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength)); - - final InetSocketAddress datanodeRpcAddr = - OzoneClientUtils.getScmDataNodeBindAddress(conf); - datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr, - DatanodeProtocolPB.class, dnProtoPbService, handlerCount); - datanodeRpcAddress = updateListenAddress(conf, - OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); - LOG.info(buildRpcServerStartMessage("RPC server for DataNodes", - datanodeRpcAddress)); - - BlockingService storageProtoPbService = - StorageContainerLocationProtocolProtos - .StorageContainerLocationProtocolService - .newReflectiveBlockingService( - new StorageContainerLocationProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress clientRpcAddr = - OzoneClientUtils.getScmClientBindAddress(conf); - clientRpcServer = startRpcServer(conf, clientRpcAddr, - StorageContainerLocationProtocolPB.class, storageProtoPbService, - handlerCount); - clientRpcAddress = updateListenAddress(conf, - OZONE_SCM_CLIENT_ADDRESS_KEY, clientRpcAddr, clientRpcServer); - LOG.info(buildRpcServerStartMessage( - "StorageContainerLocationProtocol RPC server", clientRpcAddress)); - } - - @Override - public Set getStorageContainerLocations(Set keys) - throws IOException { - LOG.trace("getStorageContainerLocations keys = {}", keys); - Pipeline pipeline = initSingleContainerPipeline(); - List liveNodes = new ArrayList<>(); - blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); - if (liveNodes.isEmpty()) { - throw new IOException("Storage container locations not found."); - } - Set locations = - Sets.newLinkedHashSet(liveNodes); - DatanodeInfo leader = liveNodes.get(0); - Set locatedContainers = - Sets.newLinkedHashSetWithExpectedSize(keys.size()); - for (String key: keys) { - locatedContainers.add(new LocatedContainer(key, key, - pipeline.getContainerName(), locations, leader)); - } - LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}", - keys, locatedContainers); - return locatedContainers; - } - - /** - * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. - * - * @param containerName - Name of the container. - * @return Pipeline. - * @throws IOException - */ - @Override - public Pipeline allocateContainer(String containerName) throws IOException { - // TODO : This whole file will be replaced when we switch over to using - // the new protocol. So skipping connecting this code for now. - return null; - } - - @Override - public DatanodeRegistration registerDatanode( - DatanodeRegistration registration) throws IOException { - ns.writeLock(); - try { - blockManager.getDatanodeManager().registerDatanode(registration); - } finally { - ns.writeUnlock(); - } - return registration; - } - - @Override - public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed, - int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { - ns.readLock(); - try { - long cacheCapacity = 0; - long cacheUsed = 0; - int maxTransfer = blockManager.getMaxReplicationStreams() - - xmitsInProgress; - DatanodeCommand[] cmds = blockManager.getDatanodeManager() - .handleHeartbeat(registration, reports, blockManager.getBlockPoolId(), - cacheCapacity, cacheUsed, xceiverCount, maxTransfer, - failedVolumes, volumeFailureSummary); - long txnId = 234; - NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( - HAServiceProtocol.HAServiceState.ACTIVE, txnId); - RollingUpgradeInfo rollingUpgradeInfo = null; - long blockReportLeaseId = requestFullBlockReportLease ? - blockManager.requestBlockReportLeaseId(registration) : 0; - return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, - blockReportLeaseId); - } finally { - ns.readUnlock(); - } - } - - @Override - public DatanodeCommand blockReport(DatanodeRegistration registration, - String poolId, StorageBlockReport[] reports, BlockReportContext context) - throws IOException { - for (int r = 0; r < reports.length; r++) { - final BlockListAsLongs storageContainerList = reports[r].getBlocks(); - blockManager.processReport(registration, reports[r].getStorage(), - storageContainerList, context); - } - return null; - } - - @Override - public DatanodeCommand cacheReport(DatanodeRegistration registration, - String poolId, List blockIds) throws IOException { - // Centralized Cache Management is not supported - return null; - } - - @Override - public void blockReceivedAndDeleted(DatanodeRegistration registration, - String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks) - throws IOException { - for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) { - ns.writeLock(); - try { - blockManager.processIncrementalBlockReport(registration, r); - } finally { - ns.writeUnlock(); - } - } - } - - @Override - public void errorReport(DatanodeRegistration registration, - int errorCode, String msg) throws IOException { - String dnName = - (registration == null) ? "Unknown DataNode" : registration.toString(); - if (errorCode == DatanodeProtocol.NOTIFY) { - LOG.info("Error report from " + dnName + ": " + msg); - return; - } - if (errorCode == DatanodeProtocol.DISK_ERROR) { - LOG.warn("Disk error on " + dnName + ": " + msg); - } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) { - LOG.warn("Fatal disk error on " + dnName + ": " + msg); - blockManager.getDatanodeManager().removeDatanode(registration); - } else { - LOG.info("Error report from " + dnName + ": " + msg); - } - } - - @Override - public NamespaceInfo versionRequest() throws IOException { - ns.readLock(); - try { - return new NamespaceInfo(1, "random", "random", 2, - NodeType.STORAGE_CONTAINER_SERVICE); - } finally { - ns.readUnlock(); - } - } - - @Override - public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { - ns.writeLock(); - try { - for (int i = 0; i < blocks.length; i++) { - ExtendedBlock blk = blocks[i].getBlock(); - DatanodeInfo[] nodes = blocks[i].getLocations(); - String[] storageIDs = blocks[i].getStorageIDs(); - for (int j = 0; j < nodes.length; j++) { - blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j], - storageIDs == null ? null: storageIDs[j], - "client machine reported it"); - } - } - } finally { - ns.writeUnlock(); - } - } - - @Override - public void commitBlockSynchronization(ExtendedBlock block, - long newgenerationstamp, long newlength, boolean closeFile, - boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages) - throws IOException { - // Not needed for the purpose of object store - throw new UnsupportedOperationException(); - } - - /** - * Returns information on registered DataNodes. - * - * @param type DataNode type to report - * @return registered DataNodes matching requested type - */ - public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) { - ns.readLock(); - try { - List results = - blockManager.getDatanodeManager().getDatanodeListForReport(type); - return results.toArray(new DatanodeInfo[results.size()]); - } finally { - ns.readUnlock(); - } - } - - /** - * Returns listen address of client RPC server. - * - * @return listen address of client RPC server - */ - @VisibleForTesting - public InetSocketAddress getClientRpcAddress() { - return clientRpcAddress; - } - - /** - * Start service. - */ - public void start() { - clientRpcServer.start(); - datanodeRpcServer.start(); - } - - /** - * Stop service. - */ - public void stop() { - if (clientRpcServer != null) { - clientRpcServer.stop(); - } - if (datanodeRpcServer != null) { - datanodeRpcServer.stop(); - } - IOUtils.closeStream(ns); - } - - /** - * Wait until service has completed shutdown. - */ - public void join() { - try { - clientRpcServer.join(); - datanodeRpcServer.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.info("Interrupted during StorageContainerManager join."); - } - } - - /** - * Lazily initializes a single container pipeline using all registered - * DataNodes via a synchronous call to the container protocol. This single - * container pipeline will be reused for container requests for the lifetime - * of this StorageContainerManager. - * - * @throws IOException if there is an I/O error - */ - private synchronized Pipeline initSingleContainerPipeline() - throws IOException { - if (singlePipeline == null) { - List liveNodes = new ArrayList(); - blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); - if (liveNodes.isEmpty()) { - throw new IOException("Storage container locations not found."); - } - Pipeline newPipeline = newPipelineFromNodes(liveNodes, - UUID.randomUUID().toString()); - XceiverClient xceiverClient = - xceiverClientManager.acquireClient(newPipeline); - try { - ContainerData containerData = ContainerData - .newBuilder() - .setName(newPipeline.getContainerName()) - .build(); - CreateContainerRequestProto createContainerRequest = - CreateContainerRequestProto.newBuilder() - .setPipeline(newPipeline.getProtobufMessage()) - .setContainerData(containerData) - .build(); - ContainerCommandRequestProto request = ContainerCommandRequestProto - .newBuilder() - .setCmdType(Type.CreateContainer) - .setCreateContainer(createContainerRequest) - .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand( - request); - Result result = response.getResult(); - if (result != Result.SUCCESS) { - throw new IOException( - "Failed to initialize container due to result code: " + result); - } - singlePipeline = newPipeline; - } finally { - xceiverClientManager.releaseClient(xceiverClient); - } - } - return singlePipeline; - } - - /** - * Builds a message for logging startup information about an RPC server. - * - * @param description RPC server description - * @param addr RPC server listening address - * @return server startup message - */ - private static String buildRpcServerStartMessage(String description, - InetSocketAddress addr) { - return addr != null ? String.format("%s is listening at %s", - description, addr.getHostString() + ":" + addr.getPort()) : - String.format("%s not started", description); - } - - /** - * Translates a list of nodes, ordered such that the first is the leader, into - * a corresponding {@link Pipeline} object. - * - * @param nodes list of nodes - * @param containerName container name - * @return pipeline corresponding to nodes - */ - private static Pipeline newPipelineFromNodes(List nodes, - String containerName) { - String leaderId = nodes.get(0).getDatanodeUuid(); - Pipeline pipeline = new Pipeline(leaderId); - for (DatanodeDescriptor node : nodes) { - pipeline.addMember(node); - } - pipeline.setContainerName(containerName); - return pipeline; - } - - /** - * Starts an RPC server, if configured. - * - * @param conf configuration - * @param addr configured address of RPC server - * @param protocol RPC protocol provided by RPC server - * @param instance RPC protocol implementation instance - * @param handlerCount RPC server handler count - * - * @return RPC server - * @throws IOException if there is an I/O error while creating RPC server - */ - private static RPC.Server startRpcServer(OzoneConfiguration conf, - InetSocketAddress addr, Class protocol, BlockingService instance, - int handlerCount) - throws IOException { - RPC.Server rpcServer = new RPC.Builder(conf) - .setProtocol(protocol) - .setInstance(instance) - .setBindAddress(addr.getHostString()) - .setPort(addr.getPort()) - .setNumHandlers(handlerCount) - .setVerbose(false) - .setSecretManager(null) - .build(); - - DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); - return rpcServer; - } - - /** - * After starting an RPC server, updates configuration with the actual - * listening address of that server. The listening address may be different - * from the configured address if, for example, the configured address uses - * port 0 to request use of an ephemeral port. - * - * @param conf configuration to update - * @param rpcAddressKey configuration key for RPC server address - * @param addr configured address - * @param rpcServer started RPC server. - */ - private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, - String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { - InetSocketAddress listenAddr = rpcServer.getListenerAddress(); - InetSocketAddress updatedAddr = new InetSocketAddress( - addr.getHostString(), listenAddr.getPort()); - conf.set(rpcAddressKey, - addr.getHostString() + ":" + updatedAddr.getPort()); - return updatedAddr; - } - - /** - * Main entry point for starting StorageContainerManager. - * - * @param argv arguments - * @throws IOException if startup fails due to I/O error - */ - public static void main(String[] argv) throws IOException { - StringUtils.startupShutdownMessage( - StorageContainerManager.class, argv, LOG); - try { - StorageContainerManager scm = new StorageContainerManager( - new OzoneConfiguration()); - scm.start(); - scm.join(); - } catch (Throwable t) { - LOG.error("Failed to start the StorageContainerManager.", t); - terminate(1, t); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java deleted file mode 100644 index 97f5458..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerNameService.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.storage; - -import java.io.Closeable; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; -import org.apache.hadoop.hdfs.server.namenode.CacheManager; -import org.apache.hadoop.hdfs.server.namenode.FSDirectory; -import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; - -/** - * Namesystem implementation intended for use by StorageContainerManager. - */ -@InterfaceAudience.Private -public class StorageContainerNameService implements Namesystem, Closeable { - - private final ReentrantReadWriteLock coarseLock = - new ReentrantReadWriteLock(); - private volatile boolean serviceRunning = true; - - @Override - public boolean isRunning() { - return serviceRunning; - } - - @Override - public BlockCollection getBlockCollection(long id) { - // TBD - return null; - } - - @Override - public FSDirectory getFSDirectory() { - return null; - } - - @Override - public void startSecretManagerIfNecessary() { - // Secret manager is not supported - } - - @Override - public CacheManager getCacheManager() { - // Centralized Cache Management is not supported - return null; - } - - @Override - public HAContext getHAContext() { - // HA mode is not supported - return null; - } - - @Override - public boolean inTransitionToActive() { - // HA mode is not supported - return false; - } - - @Override - public boolean isInSnapshot(long blockCollectionID) { - // Snapshots not supported - return false; - } - - @Override - public void readLock() { - coarseLock.readLock().lock(); - } - - @Override - public void readUnlock() { - coarseLock.readLock().unlock(); - } - - @Override - public boolean hasReadLock() { - return coarseLock.getReadHoldCount() > 0 || hasWriteLock(); - } - - @Override - public void writeLock() { - coarseLock.writeLock().lock(); - } - - @Override - public void writeLockInterruptibly() throws InterruptedException { - coarseLock.writeLock().lockInterruptibly(); - } - - @Override - public void writeUnlock() { - coarseLock.writeLock().unlock(); - } - - @Override - public boolean hasWriteLock() { - return coarseLock.isWriteLockedByCurrentThread(); - } - - @Override - public boolean isInSafeMode() { - // Safe mode is not supported - return false; - } - - @Override - public boolean isInStartupSafeMode() { - // Safe mode is not supported - return false; - } - - @Override - public void close() { - serviceRunning = false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d63ec0ca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java deleted file mode 100644 index 75e337f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.storage; - -/** - * This package contains StorageContainerManager classes. - */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org