hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject hadoop git commit: HDDS-256. Adding CommandStatusReport Handler. Contributed by Ajay Kumar.
Date Fri, 20 Jul 2018 18:07:22 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 8a6bb8409 -> 89a0f8074


HDDS-256. Adding CommandStatusReport Handler. Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/89a0f807
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/89a0f807
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/89a0f807

Branch: refs/heads/trunk
Commit: 89a0f80741beb5a998f143849e797d780332048b
Parents: 8a6bb84
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Fri Jul 20 11:03:33 2018 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Fri Jul 20 11:07:09 2018 -0700

----------------------------------------------------------------------
 .../scm/command/CommandStatusReportHandler.java | 129 +++++++++++++++++
 .../hadoop/hdds/scm/command/package-info.java   |  26 ++++
 .../hadoop/hdds/scm/events/SCMEvents.java       |  24 ++++
 .../scm/server/StorageContainerManager.java     |   4 +
 .../org/apache/hadoop/hdds/scm/TestUtils.java   |  18 +++
 .../command/TestCommandStatusReportHandler.java | 137 +++++++++++++++++++
 .../hadoop/hdds/scm/command/package-info.java   |  22 +++
 7 files changed, 360 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
new file mode 100644
index 0000000..9413a46
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.command;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .CommandStatusReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Handles CommandStatusReports from datanode.
+ */
+public class CommandStatusReportHandler implements
+    EventHandler<CommandStatusReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(CommandStatusReportHandler.class);
+
+  @Override
+  public void onMessage(CommandStatusReportFromDatanode report,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(report);
+    List<CommandStatus> cmdStatusList = report.getReport().getCmdStatusList();
+    Preconditions.checkNotNull(cmdStatusList);
+    LOGGER.trace("Processing command status report for dn: {}", report
+        .getDatanodeDetails());
+
+    // Route command status to its watchers.
+    cmdStatusList.forEach(cmdStatus -> {
+      LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
+          .getCmdId(), cmdStatus.getType());
+      switch (cmdStatus.getType()) {
+      case replicateContainerCommand:
+        publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
+            ReplicationStatus(cmdStatus));
+        break;
+      case closeContainerCommand:
+        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+            CloseContainerStatus(cmdStatus));
+        break;
+      case deleteBlocksCommand:
+        publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, new
+            DeleteBlockCommandStatus(cmdStatus));
+        break;
+      default:
+        LOGGER.debug("CommandStatus of type:{} not handled in " +
+            "CommandStatusReportHandler.", cmdStatus.getType());
+        break;
+      }
+    });
+  }
+
+  /**
+   * Wrapper event for CommandStatus.
+   */
+  public static class CommandStatusEvent implements IdentifiableEventPayload {
+    private CommandStatus cmdStatus;
+
+    CommandStatusEvent(CommandStatus cmdStatus) {
+      this.cmdStatus = cmdStatus;
+    }
+
+    public CommandStatus getCmdStatus() {
+      return cmdStatus;
+    }
+
+    @Override
+    public String toString() {
+      return "CommandStatusEvent:" + cmdStatus.toString();
+    }
+
+    @Override
+    public long getId() {
+      return cmdStatus.getCmdId();
+    }
+  }
+
+  /**
+   * Wrapper event for Replicate Command.
+   */
+  public static class ReplicationStatus extends CommandStatusEvent {
+    ReplicationStatus(CommandStatus cmdStatus) {
+      super(cmdStatus);
+    }
+  }
+
+  /**
+   * Wrapper event for CloseContainer Command.
+   */
+  public static class CloseContainerStatus extends CommandStatusEvent {
+    CloseContainerStatus(CommandStatus cmdStatus) {
+      super(cmdStatus);
+    }
+  }
+
+  /**
+   * Wrapper event for DeleteBlock Command.
+   */
+  public static class DeleteBlockCommandStatus extends CommandStatusEvent {
+    DeleteBlockCommandStatus(CommandStatus cmdStatus) {
+      super(cmdStatus);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
new file mode 100644
index 0000000..ba17fb9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ * <p>
+ * This package contains HDDS protocol related classes.
+ */
+
+/**
+ * This package contains HDDS protocol related classes.
+ */
+package org.apache.hadoop.hdds.scm.command;
+/*
+ * Classes related to commands issued from SCM to DataNode.
+ * */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 485b3f5..46f1588 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler.*;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .CommandStatusReportFromDatanode;
@@ -105,6 +106,29 @@ public final class SCMEvents {
       new TypedEvent<>(DatanodeDetails.class, "Dead_Node");
 
   /**
+   * This event will be triggered by CommandStatusReportHandler whenever a
+   * status for Replication SCMCommand is received.
+   */
+  public static final Event<ReplicationStatus> REPLICATION_STATUS = new
+      TypedEvent<>(ReplicationStatus.class, "ReplicateCommandStatus");
+  /**
+   * This event will be triggered by CommandStatusReportHandler whenever a
+   * status for CloseContainer SCMCommand is received.
+   */
+  public static final Event<CloseContainerStatus>
+      CLOSE_CONTAINER_STATUS =
+      new TypedEvent<>(CloseContainerStatus.class,
+          "CloseContainerCommandStatus");
+  /**
+   * This event will be triggered by CommandStatusReportHandler whenever a
+   * status for DeleteBlock SCMCommand is received.
+   */
+  public static final Event<DeleteBlockCommandStatus>
+      DELETE_BLOCK_STATUS =
+      new TypedEvent(DeleteBlockCommandStatus.class,
+          "DeleteBlockCommandStatus");
+
+  /**
    * Private Ctor. Never Constructed.
    */
   private SCMEvents() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index f37a0ed..aba6410 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
@@ -191,6 +192,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new NodeReportHandler(scmNodeManager);
     ContainerReportHandler containerReportHandler =
         new ContainerReportHandler(scmContainerManager, node2ContainerMap);
+    CommandStatusReportHandler cmdStatusReportHandler =
+        new CommandStatusReportHandler();
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
     StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap);
@@ -202,6 +205,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
     eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
+    eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
 
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 7568bf3..8d7a2c2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -17,8 +17,14 @@
 package org.apache.hadoop.hdds.scm;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol
     .proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol
+    .proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol
+    .proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -90,6 +96,18 @@ public final class TestUtils {
     return reportList;
   }
 
+  /**
+   * Create Command Status report object.
+   * @return CommandStatusReportsProto
+   */
+  public static CommandStatusReportsProto createCommandStatusReport(
+      List<CommandStatus> reports) {
+    CommandStatusReportsProto.Builder report = CommandStatusReportsProto
+        .newBuilder();
+    report.addAllCmdStatus(reports);
+    return report.build();
+  }
+
 
   /**
    * Get specified number of DatanodeDetails and registered them with node

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
new file mode 100644
index 0000000..5e64e57
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/TestCommandStatusReportHandler.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.command;
+
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .CommandStatusReportFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class TestCommandStatusReportHandler implements EventPublisher {
+
+  private static Logger LOG = LoggerFactory
+      .getLogger(TestCommandStatusReportHandler.class);
+  private CommandStatusReportHandler cmdStatusReportHandler;
+  private String storagePath = GenericTestUtils.getRandomizedTempPath()
+      .concat("/" + UUID.randomUUID().toString());
+  ;
+
+  @Before
+  public void setup() {
+    cmdStatusReportHandler = new CommandStatusReportHandler();
+  }
+
+  @Test
+  public void testCommandStatusReport() {
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(LOG);
+
+    CommandStatusReportFromDatanode report = this.getStatusReport(Collections
+        .emptyList());
+    cmdStatusReportHandler.onMessage(report, this);
+    assertFalse(logCapturer.getOutput().contains("DeleteBlockCommandStatus"));
+    assertFalse(logCapturer.getOutput().contains
+        ("CloseContainerCommandStatus"));
+    assertFalse(logCapturer.getOutput().contains
+        ("ReplicateCommandStatus"));
+
+
+    report = this.getStatusReport(this.getCommandStatusList());
+    cmdStatusReportHandler.onMessage(report, this);
+    assertTrue(logCapturer.getOutput().contains("firing event of type " +
+        "DeleteBlockCommandStatus"));
+    assertTrue(logCapturer.getOutput().contains("firing event of type " +
+        "CloseContainerCommandStatus"));
+    assertTrue(logCapturer.getOutput().contains("firing event of type " +
+        "ReplicateCommandStatus"));
+
+    assertTrue(logCapturer.getOutput().contains("type: " +
+        "closeContainerCommand"));
+    assertTrue(logCapturer.getOutput().contains("type: " +
+        "deleteBlocksCommand"));
+    assertTrue(logCapturer.getOutput().contains("type: " +
+        "replicateContainerCommand"));
+
+  }
+
+  private CommandStatusReportFromDatanode getStatusReport(List<CommandStatus>
+      reports) {
+    CommandStatusReportsProto report = TestUtils.createCommandStatusReport
+        (reports);
+    DatanodeDetails dn = TestUtils.getDatanodeDetails();
+    return new SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode
+        (dn, report);
+  }
+
+  @Override
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent
+      (EVENT_TYPE event, PAYLOAD payload) {
+    LOG.info("firing event of type {}, payload {}", event.getName(), payload
+        .toString());
+  }
+
+  private List<CommandStatus> getCommandStatusList() {
+    List<CommandStatus> reports = new ArrayList<>(3);
+
+    // Add status message for replication, close container and delete block
+    // command.
+    CommandStatus.Builder builder = CommandStatus.newBuilder();
+
+    builder.setCmdId(HddsIdFactory.getLongId())
+        .setStatus(CommandStatus.Status.EXECUTED)
+        .setType(Type.deleteBlocksCommand);
+    reports.add(builder.build());
+
+    builder.setCmdId(HddsIdFactory.getLongId())
+        .setStatus(CommandStatus.Status.EXECUTED)
+        .setType(Type.closeContainerCommand);
+    reports.add(builder.build());
+
+    builder.setMsg("Not enough space")
+        .setCmdId(HddsIdFactory.getLongId())
+        .setStatus(CommandStatus.Status.FAILED)
+        .setType(Type.replicateContainerCommand);
+    reports.add(builder.build());
+    return reports;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/89a0f807/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
new file mode 100644
index 0000000..f529c20
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/command/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Make CheckStyle Happy.
+ */
+package org.apache.hadoop.hdds.scm.command;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message