hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject hadoop git commit: HDFS-11444. Ozone: Fix datanode ID handling in MiniOzoneCluster. Contributed by Weiwei Yang.
Date Mon, 20 Mar 2017 20:21:21 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 65487b579 -> 603f2c18e


HDFS-11444. Ozone: Fix datanode ID handling in MiniOzoneCluster. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/603f2c18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/603f2c18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/603f2c18

Branch: refs/heads/HDFS-7240
Commit: 603f2c18ec2dcba19b3bd51c3b6f5beb0d3da566
Parents: 65487b5
Author: Anu Engineer <aengineer@apache.org>
Authored: Mon Mar 20 13:18:26 2017 -0700
Committer: Anu Engineer <aengineer@apache.org>
Committed: Mon Mar 20 13:18:26 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocol/DatanodeID.java |   3 +-
 .../org/apache/hadoop/scm/XceiverClient.java    |  10 +
 .../hadoop/hdfs/server/datanode/DataNode.java   | 225 ++++++++++---------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  14 ++
 .../common/helpers/ContainerUtils.java          |  85 +++++++
 .../statemachine/DatanodeStateMachine.java      |  70 +++++-
 .../common/statemachine/StateContext.java       |   9 +
 .../states/datanode/InitDatanodeState.java      |  25 +++
 .../states/datanode/RunningDatanodeState.java   |  95 ++------
 .../common/transport/server/XceiverServer.java  |  26 ++-
 .../transport/server/XceiverServerSpi.java      |   3 +
 .../container/ozoneimpl/OzoneContainer.java     |   9 +
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  11 +
 .../hadoop/ozone/TestMiniOzoneCluster.java      | 223 ++++++++++++++++++
 .../common/TestDatanodeStateMachine.java        |  21 +-
 .../container/ozoneimpl/TestOzoneContainer.java |   5 +-
 16 files changed, 638 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index a7c2d06..c563079 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -318,9 +318,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
   public HdfsProtos.DatanodeIDProto getProtoBufMessage() {
     HdfsProtos.DatanodeIDProto.Builder builder =
         HdfsProtos.DatanodeIDProto.newBuilder();
-
     return builder.setDatanodeUuid(this.getDatanodeUuid())
-        .setIpAddr(this.getIpcAddr())
+        .setIpAddr(this.getIpAddr())
         .setHostName(this.getHostName())
         .setXferPort(this.getXferPort())
         .setInfoPort(this.getInfoPort())

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
index c6e47c8..fc9092a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.scm;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelFuture;
@@ -87,6 +88,15 @@ public class XceiverClient implements XceiverClientSpi {
     channelFuture = b.connect(leader.getHostName(), port).sync();
   }
 
+  /**
+   * Returns if the exceiver client connects to a server.
+   * @return True if the connection is alive, false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isConnected() {
+    return channelFuture.channel().isActive();
+  }
+
   @Override
   public void close() {
     if(group != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index f5d7da1..22a13ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -400,6 +400,7 @@ public class DataNode extends ReconfigurableBase
   private final DatasetVolumeChecker volumeChecker;
 
   private final SocketFactory socketFactory;
+
   private DatanodeStateMachine datanodeStateMachine;
 
   private static Tracer createTracer(Configuration conf) {
@@ -1543,7 +1544,15 @@ public class DataNode extends ReconfigurableBase
           + bpRegistration.getDatanodeUuid()
           + ". Expecting " + storage.getDatanodeUuid());
     }
-    
+
+    if (isOzoneEnabled()) {
+      if (datanodeStateMachine == null) {
+        datanodeStateMachine = new DatanodeStateMachine(
+            getDatanodeId(),
+            getConf());
+        datanodeStateMachine.startDaemon();
+      }
+    }
     registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
   }
   
@@ -1643,14 +1652,6 @@ public class DataNode extends ReconfigurableBase
     data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
     initDirectoryScanner(getConf());
-    if(this.ozoneEnabled) {
-      try {
-        datanodeStateMachine = DatanodeStateMachine.initStateMachine(getConf());
-        LOG.info("Ozone container server started.");
-      } catch (IOException ex) {
-        LOG.error("Unable to start Ozone. ex: {}", ex);
-      }
-    }
     initDiskBalancer(data, getConf());
   }
 
@@ -1661,11 +1662,11 @@ public class DataNode extends ReconfigurableBase
   BPOfferService getBPOfferService(String bpid){
     return blockPoolManager.get(bpid);
   }
-  
+
   int getBpOsCount() {
     return blockPoolManager.getAllNamenodeThreads().size();
   }
-  
+
   /**
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
@@ -1998,125 +1999,119 @@ public class DataNode extends ReconfigurableBase
     }
 
     // Stop the object store handler
-    if (this.objectStoreHandler != null) {
-      this.objectStoreHandler.close();
+    if (isOzoneEnabled()) {
+      if (this.objectStoreHandler != null) {
+        this.objectStoreHandler.close();
+      }
     }
 
-    if (this.ozoneEnabled) {
-      if (datanodeStateMachine != null) {
-        try {
-          datanodeStateMachine.close();
-        } catch (Exception e) {
-          LOG.error("Error is ozone shutdown. ex {}", e.toString());
-        }
-      }
-      volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
+    volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
 
-      if (storageLocationChecker != null) {
-        storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
-      }
+    if (storageLocationChecker != null) {
+      storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
+    }
 
-      if (pauseMonitor != null) {
-        pauseMonitor.stop();
-      }
+    if (pauseMonitor != null) {
+      pauseMonitor.stop();
+    }
 
-      // shouldRun is set to false here to prevent certain threads from exiting
-      // before the restart prep is done.
-      this.shouldRun = false;
+    // shouldRun is set to false here to prevent certain threads from exiting
+    // before the restart prep is done.
+    this.shouldRun = false;
 
-      // wait reconfiguration thread, if any, to exit
-      shutdownReconfigurationTask();
-
-      // wait for all data receiver threads to exit
-      if (this.threadGroup != null) {
-        int sleepMs = 2;
-        while (true) {
-          // When shutting down for restart, wait 2.5 seconds before forcing
-          // termination of receiver threads.
-          if (!this.shutdownForUpgrade ||
-              (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
-                  > 1000))) {
-            this.threadGroup.interrupt();
-            break;
-          }
-          LOG.info("Waiting for threadgroup to exit, active threads is " +
-              this.threadGroup.activeCount());
-          if (this.threadGroup.activeCount() == 0) {
-            break;
-          }
-          try {
-            Thread.sleep(sleepMs);
-          } catch (InterruptedException e) {
-          }
-          sleepMs = sleepMs * 3 / 2; // exponential backoff
-          if (sleepMs > 200) {
-            sleepMs = 200;
-          }
+    // wait reconfiguration thread, if any, to exit
+    shutdownReconfigurationTask();
+
+    // wait for all data receiver threads to exit
+    if (this.threadGroup != null) {
+      int sleepMs = 2;
+      while (true) {
+        // When shutting down for restart, wait 2.5 seconds before forcing
+        // termination of receiver threads.
+        if (!this.shutdownForUpgrade || (this.shutdownForUpgrade && (
+            Time.monotonicNow() - timeNotified > 1000))) {
+          this.threadGroup.interrupt();
+          break;
         }
-        this.threadGroup = null;
-      }
-      if (this.dataXceiverServer != null) {
-        // wait for dataXceiverServer to terminate
-        try {
-          this.dataXceiverServer.join();
-        } catch (InterruptedException ie) {
+        LOG.info("Waiting for threadgroup to exit, active threads is "
+            + this.threadGroup.activeCount());
+        if (this.threadGroup.activeCount() == 0) {
+          break;
         }
-      }
-      if (this.localDataXceiverServer != null) {
-        // wait for localDataXceiverServer to terminate
         try {
-          this.localDataXceiverServer.join();
-        } catch (InterruptedException ie) {
+          Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {
         }
+        sleepMs = sleepMs * 3 / 2; // exponential backoff
+        if (sleepMs > 200) {
+          sleepMs = 200;
+        }
+      }
+      this.threadGroup = null;
+    }
+    if (this.dataXceiverServer != null) {
+      // wait for dataXceiverServer to terminate
+      try {
+        this.dataXceiverServer.join();
+      } catch (InterruptedException ie) {
       }
-      if (metrics != null) {
-        metrics.setDataNodeActiveXceiversCount(0);
+    }
+    if (this.localDataXceiverServer != null) {
+      // wait for localDataXceiverServer to terminate
+      try {
+        this.localDataXceiverServer.join();
+      } catch (InterruptedException ie) {
       }
+    }
+    if (metrics != null) {
+      metrics.setDataNodeActiveXceiversCount(0);
+    }
 
-      // IPC server needs to be shutdown late in the process, otherwise
-      // shutdown command response won't get sent.
-      if (ipcServer != null) {
-        ipcServer.stop();
-      }
+    // IPC server needs to be shutdown late in the process, otherwise
+    // shutdown command response won't get sent.
+    if (ipcServer != null) {
+      ipcServer.stop();
+    }
 
-      if (blockPoolManager != null) {
-        try {
-          this.blockPoolManager.shutDownAll(bposArray);
-        } catch (InterruptedException ie) {
-          LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
-        }
+    if (blockPoolManager != null) {
+      try {
+        this.blockPoolManager.shutDownAll(bposArray);
+      } catch (InterruptedException ie) {
+        LOG.warn("Received exception in BlockPoolManager#shutDownAll: ", ie);
       }
+    }
 
-      if (storage != null) {
-        try {
-          this.storage.unlockAll();
-        } catch (IOException ie) {
-          LOG.warn("Exception when unlocking storage: " + ie, ie);
-        }
-      }
-      if (data != null) {
-        data.shutdown();
-      }
-      if (metrics != null) {
-        metrics.shutdown();
-      }
-      if (diskMetrics != null) {
-        diskMetrics.shutdownAndWait();
-      }
-      if (dataNodeInfoBeanName != null) {
-        MBeans.unregister(dataNodeInfoBeanName);
-        dataNodeInfoBeanName = null;
-      }
-      if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
-      LOG.info("Shutdown complete.");
-      synchronized (this) {
-        // it is already false, but setting it again to avoid a findbug warning.
-        this.shouldRun = false;
-        // Notify the main thread.
-        notifyAll();
+    if (storage != null) {
+      try {
+        this.storage.unlockAll();
+      } catch (IOException ie) {
+        LOG.warn("Exception when unlocking storage: " + ie, ie);
       }
-      tracer.close();
     }
+    if (data != null) {
+      data.shutdown();
+    }
+    if (metrics != null) {
+      metrics.shutdown();
+    }
+    if (diskMetrics != null) {
+      diskMetrics.shutdownAndWait();
+    }
+    if (dataNodeInfoBeanName != null) {
+      MBeans.unregister(dataNodeInfoBeanName);
+      dataNodeInfoBeanName = null;
+    }
+    if (shortCircuitRegistry != null) {
+      shortCircuitRegistry.shutdown();
+    }
+    LOG.info("Shutdown complete.");
+    synchronized (this) {
+      // it is already false, but setting it again to avoid a findbug warning.
+      this.shouldRun = false;
+      // Notify the main thread.
+      notifyAll();
+    }
+    tracer.close();
   }
 
   /**
@@ -3149,6 +3144,12 @@ public class DataNode extends ReconfigurableBase
           } catch (InterruptedException ie) { }
         }
         shutdown();
+
+        if (isOzoneEnabled()) {
+          if(datanodeStateMachine != null) {
+            datanodeStateMachine.stopDaemon();
+          }
+        }
       }
     };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f42a956..9d242d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -30,6 +30,20 @@ public final class OzoneConfigKeys {
   public static final String DFS_CONTAINER_IPC_PORT =
       "dfs.container.ipc";
   public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+
+  /**
+   *
+   * When set to true, allocate a random free port for ozone container,
+   * so that a mini cluster is able to launch multiple containers on a node.
+   *
+   * When set to false (default), container port is fixed as specified by
+   * DFS_CONTAINER_IPC_PORT_DEFAULT.
+   */
+  public static final String DFS_CONTAINER_IPC_RANDOM_PORT =
+      "dfs.container.ipc.random.port";
+  public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
+      false;
+
   public static final String OZONE_LOCALSTORAGE_ROOT =
       "ozone.localstorage.root";
   public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index 4aff972..05dd41e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.utils.LevelDBStore;
@@ -30,9 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.apache.commons.io.FilenameUtils.removeExtension;
 import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
@@ -367,4 +373,83 @@ public final class ContainerUtils {
     FileUtils.forceDelete(containerPath.toFile());
     FileUtils.forceDelete(metaPath.toFile());
   }
+
+  /**
+   * Write datanode ID protobuf messages to an ID file.
+   * The old ID file will be overwritten.
+   *
+   * @param ids A set of {@link DatanodeID}
+   * @param path Local ID file path
+   * @throws IOException When read/write error occurs
+   */
+  private synchronized static void writeDatanodeIDs(List<DatanodeID> ids,
+      File path) throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the datanode ID file.");
+      }
+    } else {
+      if(!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create datanode ID directories.");
+      }
+    }
+    try (FileOutputStream out = new FileOutputStream(path)) {
+      for (DatanodeID id : ids) {
+        HdfsProtos.DatanodeIDProto dnId = id.getProtoBufMessage();
+        dnId.writeDelimitedTo(out);
+      }
+    }
+  }
+
+  /**
+   * Persistent a {@link DatanodeID} to a local file.
+   * It reads the IDs first and append a new entry only if the ID is new.
+   * This is to avoid on some dirty environment, this file gets too big.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  public synchronized static void writeDatanodeIDTo(DatanodeID dnID,
+      File path) throws IOException {
+    List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(path);
+    // Only create or overwrite the file
+    // if the ID doesn't exist in the ID file
+    for (DatanodeID id : ids) {
+      if (id.getProtoBufMessage()
+          .equals(dnID.getProtoBufMessage())) {
+        return;
+      }
+    }
+    ids.add(dnID);
+    writeDatanodeIDs(ids, path);
+  }
+
+  /**
+   * Read {@link DatanodeID} from a local ID file and return a set of
+   * datanode IDs. If the ID file doesn't exist, an empty set is returned.
+   *
+   * @param path ID file local path
+   * @return A set of {@link DatanodeID}
+   * @throws IOException If the id file is malformed or other I/O exceptions
+   */
+  public synchronized static List<DatanodeID> readDatanodeIDsFrom(File path)
+      throws IOException {
+    List<DatanodeID> ids = new ArrayList<DatanodeID>();
+    if (!path.exists()) {
+      return ids;
+    }
+    try(FileInputStream in = new FileInputStream(path)) {
+      while(in.available() > 0) {
+        try {
+          HdfsProtos.DatanodeIDProto id =
+              HdfsProtos.DatanodeIDProto.parseDelimitedFrom(in);
+          ids.add(DatanodeID.getFromProtoBuf(id));
+        } catch (IOException e) {
+          throw new IOException("Failed to parse Datanode ID from "
+              + path.getAbsolutePath(), e);
+        }
+      }
+    }
+    return ids;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c8f6dc7..6008844 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneClientUtils;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.Time;
@@ -44,13 +45,16 @@ public class DatanodeStateMachine implements Closeable {
   private final long heartbeatFrequency;
   private StateContext context;
   private final OzoneContainer container;
+  private DatanodeID datanodeID = null;
 
   /**
    * Constructs a a datanode state machine.
    *
+   * @param datanodeID - DatanodeID used to identify a datanode
    * @param conf - Configration.
    */
-  public DatanodeStateMachine(Configuration conf) throws IOException {
+  public DatanodeStateMachine(DatanodeID datanodeID,
+      Configuration conf) throws IOException {
     this.conf = conf;
     executorService = HadoopExecutors.newCachedThreadPool(
                 new ThreadFactoryBuilder().setDaemon(true)
@@ -60,6 +64,26 @@ public class DatanodeStateMachine implements Closeable {
     heartbeatFrequency = TimeUnit.SECONDS.toMillis(
         OzoneClientUtils.getScmHeartbeatInterval(conf));
     container = new OzoneContainer(conf);
+    this.datanodeID = datanodeID;
+  }
+
+  public DatanodeStateMachine(Configuration conf)
+      throws IOException {
+    this(null, conf);
+  }
+
+  public void setDatanodeID(DatanodeID datanodeID) {
+    this.datanodeID = datanodeID;
+  }
+
+  /**
+   *
+   * Return DatanodeID if set, return null otherwise.
+   *
+   * @return datanodeID
+   */
+  public DatanodeID getDatanodeID() {
+    return this.datanodeID;
   }
 
   /**
@@ -71,10 +95,14 @@ public class DatanodeStateMachine implements Closeable {
     return connectionManager;
   }
 
+  public OzoneContainer getContainer() {
+    return this.container;
+  }
+
   /**
    * Runs the state machine at a fixed frequency.
    */
-  public void start() throws IOException {
+  private void start() throws IOException {
     long now = 0;
     long nextHB = 0;
     container.start();
@@ -216,12 +244,14 @@ public class DatanodeStateMachine implements Closeable {
     }
   }
 
-  public static DatanodeStateMachine initStateMachine(Configuration conf)
-      throws IOException {
-    DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
+  /**
+   * Start datanode state machine as a single thread daemon.
+   */
+  public void startDaemon() {
     Runnable startStateMachineTask = () -> {
       try {
-        stateMachine.start();
+        start();
+        LOG.info("Ozone container server started.");
       } catch (Exception ex) {
         LOG.error("Unable to start the DatanodeState Machine", ex);
       }
@@ -231,6 +261,32 @@ public class DatanodeStateMachine implements Closeable {
         .setNameFormat("Datanode State Machine Thread - %d")
         .build().newThread(startStateMachineTask);
     thread.start();
-    return stateMachine;
+  }
+
+  /**
+   * Stop the daemon thread of the datanode state machine.
+   */
+  public synchronized void stopDaemon() {
+    try {
+      context.setState(DatanodeStates.SHUTDOWN);
+      this.close();
+      LOG.info("Ozone container server stopped.");
+    } catch (IOException e) {
+      LOG.error("Stop ozone container server failed.", e);
+    }
+  }
+
+  /**
+   *
+   * Check if the datanode state machine daemon is stopped.
+   *
+   * @return True if datanode state machine daemon is stopped
+   * and false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isDaemonStopped() {
+    return this.executorService.isShutdown()
+        && this.getContext().getExecutionCount() == 0
+        && this.getContext().getState() == DatanodeStates.SHUTDOWN;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 15a241e..e020791 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -81,6 +81,15 @@ public class StateContext {
   }
 
   /**
+   * Get the container server port.
+   * @return The container server port if available, return -1 if otherwise
+   */
+  public int getContainerPort() {
+    return parent == null ?
+        -1 : parent.getContainer().getContainerServerPort();
+  }
+
+  /**
    * Returns true if we are entering a new state.
    *
    * @return boolean

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
index 9e95f53..0427392 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -18,14 +18,19 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
 
 import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.scm.ScmConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -87,10 +92,30 @@ public class InitDatanodeState implements DatanodeState,
         connectionManager.addSCMServer(addr);
       }
     }
+
+    // If datanode ID is set, persist it to the ID file.
+    persistContainerDatanodeID();
+
     return this.context.getState().getNextState();
   }
 
   /**
+   * Update Ozone container port to the datanode ID,
+   * and persist the ID to a local file.
+   */
+  private void persistContainerDatanodeID() throws IOException {
+    String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID);
+    File idPath = new File(dataNodeIDPath);
+    int containerPort = this.context.getContainerPort();
+    DatanodeID datanodeID = this.context.getParent().getDatanodeID();
+    if (datanodeID != null) {
+      datanodeID.setContainerPort(containerPort);
+      ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
+      LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
+    }
+  }
+
+  /**
    * Called before entering this state.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 2992218..590df2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -19,8 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.datanode;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
@@ -35,16 +34,11 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -83,63 +77,30 @@ public class RunningDatanodeState implements DatanodeState {
   private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
       readPersistedDatanodeID(Path idPath) throws IOException {
     Preconditions.checkNotNull(idPath);
-    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-        containerIDProto;
-    try (FileInputStream stream = new FileInputStream(idPath.toFile())) {
-      containerIDProto = StorageContainerDatanodeProtocolProtos
-          .ContainerNodeIDProto.parseFrom(stream);
-      return containerIDProto;
+    DatanodeID datanodeID = null;
+    List<DatanodeID> datanodeIDs =
+        ContainerUtils.readDatanodeIDsFrom(idPath.toFile());
+    int containerPort = this.context.getContainerPort();
+    for(DatanodeID dnId : datanodeIDs) {
+      if(dnId.getContainerPort() == containerPort) {
+        datanodeID = dnId;
+        break;
+      }
     }
-  }
-
-  /**
-   * Create a DatanodeID from the datanode information.
-   *
-   * @return DatanodeID
-   * @throws UnknownHostException
-   */
-  private DatanodeID createDatanodeID() throws UnknownHostException {
-    DatanodeID temp = new DatanodeID(
-        //TODO : Replace this with proper network and kerberos
-        // support code.
-        InetAddress.getLocalHost().getHostAddress(),
-        DataNode.getHostName(conf),
-        UUID.randomUUID().toString(),
-        0, /** XferPort - SCM does not use this port  */
-        0, /** Info port - SCM does not use this port */
-        0, /** Info Secure Port - SCM does not use this port */
-        0); /** IPC port - SCM does not use this port */
-
-    // TODO: make this dynamically discoverable. SCM can hand out this
-    // port number to calling applications. This makes it easy to run multiple
-    // container endpoints on the same machine.
-    temp.setContainerPort(OzoneClientUtils.getContainerPort(conf));
-    return temp;
-  }
 
-  /**
-   * Creates a new ContainerID that persists both DatanodeID and ClusterID.
-   *
-   * @param idPath Path to the id file.
-   * @return ContainerNodeIDProto
-   * @throws UnknownHostException
-   */
-  private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-      createNewContainerID(Path idPath)
-      throws IOException {
-
-    if(!idPath.getParent().toFile().exists() &&
-        !idPath.getParent().toFile().mkdirs()) {
-      LOG.error("Failed to create container ID locations. Path: {}",
-          idPath.getParent());
-      throw new IOException("Unable to create container ID directories.");
-    }
-    StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
-        containerIDProto = StorageContainerDatanodeProtocolProtos
-        .ContainerNodeIDProto.newBuilder()
-        .setDatanodeID(createDatanodeID().getProtoBufMessage()).build();
-    try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) {
-      stream.write(containerIDProto.toByteArray());
+    if (datanodeID == null) {
+      throw new IOException("No valid datanode ID found from "
+          + idPath.toFile().getAbsolutePath()
+          + " that matches container port "
+          + containerPort);
+    } else {
+      StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
+          containerIDProto =
+          StorageContainerDatanodeProtocolProtos
+              .ContainerNodeIDProto
+              .newBuilder()
+              .setDatanodeID(datanodeID.getProtoBufMessage())
+              .build();
       return containerIDProto;
     }
   }
@@ -171,15 +132,7 @@ public class RunningDatanodeState implements DatanodeState {
     } catch (IOException ex) {
       LOG.trace("Not able to find container Node ID, creating it.", ex);
     }
-    // Not found, let us create a new datanode ID, persist it and return that
-    // info to SCM.
-    try {
-      nodeID = createNewContainerID(Paths.get(dataNodeIDPath));
-      LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid());
-      return nodeID;
-    } catch (IOException ex) {
-      LOG.error("Creating new node ID failed.", ex);
-    }
+    this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index 264ba4a..cd2146b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -29,15 +29,20 @@ import io.netty.handler.logging.LoggingHandler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.ServerSocket;
 
 /**
  * Creates a netty server endpoint that acts as the communication layer for
  * Ozone containers.
  */
 public final class XceiverServer implements XceiverServerSpi {
-  private final int port;
+  private static final Logger
+      LOG = LoggerFactory.getLogger(XceiverServer.class);
+  private int port;
   private final ContainerDispatcher storageContainer;
 
   private EventLoopGroup bossGroup;
@@ -52,12 +57,31 @@ public final class XceiverServer implements XceiverServerSpi {
   public XceiverServer(Configuration conf,
                        ContainerDispatcher dispatcher) {
     Preconditions.checkNotNull(conf);
+
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    // Get an available port on current node and
+    // use that as the container port
+    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        socket.setReuseAddress(true);
+        this.port = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", this.port);
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", this.port, e);
+      }
+    }
     this.storageContainer = dispatcher;
   }
 
   @Override
+  public int getIPCPort() {
+    return this.port;
+  }
+
+  @Override
   public void start() throws IOException {
     bossGroup = new NioEventLoopGroup();
     workerGroup = new NioEventLoopGroup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index f274151..e5b0497 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -28,4 +28,7 @@ public interface XceiverServerSpi {
 
   /** Stops a running server. */
   void stop();
+
+  /** Get server IPC port. */
+  int getIPCPort();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index e251da1..5db1ce8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -167,4 +167,13 @@ public class OzoneContainer {
   public SCMNodeReport getNodeReport() throws IOException {
     return this.manager.getNodeReport();
   }
+
+  /**
+   * Returns the container server IPC port.
+   *
+   * @return Container server IPC port.
+   */
+  public int getContainerServerPort() {
+    return server.getIPCPort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index d018b85..a380fc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -173,6 +173,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
         LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
             scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
             numDataNodes);
+
         return false;
       }
     }, 1000, 5 * 60 * 1000); //wait for 5 mins.
@@ -228,6 +229,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
     private Boolean ozoneEnabled = true;
     private Boolean waitForChillModeFinish = true;
     private int containerWorkerThreadInterval = 1;
+    private Boolean randomContainerPort = true;
 
     /**
      * Creates a new Builder.
@@ -247,6 +249,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster
       runID = UUID.randomUUID();
     }
 
+    public Builder setRandomContainerPort(boolean randomPort) {
+      this.randomContainerPort = randomPort;
+      return this;
+    }
+
     @Override
     public Builder numDataNodes(int val) {
       super.numDataNodes(val);
@@ -319,6 +326,10 @@ public final class MiniOzoneCluster extends MiniDFSCluster
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
 
+      // Use random ports for ozone containers in mini cluster,
+      // in order to launch multiple container servers per node.
+      conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+          randomContainerPort);
 
       StorageContainerManager scm = new StorageContainerManager(conf);
       scm.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
new file mode 100644
index 0000000..085cb86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.test.TestGenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.junit.Assert.*;
+
+/**
+ * Test cases for mini ozone cluster.
+ */
+public class TestMiniOzoneCluster {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  private final static File TEST_ROOT = TestGenericTestUtils.getTestDir();
+  private final static File WRITE_TMP = new File(TEST_ROOT, "write");
+  private final static File READ_TMP = new File(TEST_ROOT, "read");
+
+  @BeforeClass
+  public static void setup() {
+    conf = new OzoneConfiguration();
+    WRITE_TMP.mkdirs();
+    READ_TMP.mkdirs();
+    WRITE_TMP.deleteOnExit();
+    READ_TMP.deleteOnExit();
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster.close();
+    }
+  }
+
+  @Test(timeout = 30000)
+  public void testStartMultipleDatanodes() throws Exception {
+    final int numberOfNodes = 3;
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(numberOfNodes)
+        .setHandlerType("distributed").build();
+
+    // make sure datanode.id file is correct
+    File idPath = new File(
+        conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
+    assertTrue(idPath.exists());
+    List<DatanodeID> ids = ContainerUtils.readDatanodeIDsFrom(idPath);
+    assertEquals(numberOfNodes, ids.size());
+
+    List<DataNode> datanodes = cluster.getDataNodes();
+    assertEquals(datanodes.size(), numberOfNodes);
+    for(DataNode dn : datanodes) {
+      // Each datanode ID should match an entry in the ID file
+      assertTrue("Datanode ID not found in ID file",
+          ids.contains(dn.getDatanodeId()));
+
+      // Create a single member pipe line
+      String containerName = OzoneUtils.getRequestID();
+      DatanodeID dnId = dn.getDatanodeId();
+      Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid());
+      pipeline.addMember(dnId);
+      pipeline.setContainerName(containerName);
+
+      // Verify client is able to connect to the container
+      try (XceiverClient client = new XceiverClient(pipeline, conf)){
+        client.connect();
+        assertTrue(client.isConnected());
+      }
+    }
+  }
+
+  @Test
+  public void testDatanodeIDPersistent() throws Exception {
+    // Generate IDs for testing
+    DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
+    DatanodeID id2 = DFSTestUtil.getLocalDatanodeID(2);
+    DatanodeID id3 = DFSTestUtil.getLocalDatanodeID(3);
+    id1.setContainerPort(1);
+    id2.setContainerPort(2);
+    id3.setContainerPort(3);
+
+    // Write a single ID to the file and read it out
+    File validIdsFile = new File(WRITE_TMP, "valid-values.id");
+    validIdsFile.delete();
+    ContainerUtils.writeDatanodeIDTo(id1, validIdsFile);
+    List<DatanodeID> validIds = ContainerUtils
+        .readDatanodeIDsFrom(validIdsFile);
+    assertEquals(1, validIds.size());
+    DatanodeID id11 = validIds.iterator().next();
+    assertEquals(id11, id1);
+    assertEquals(id11.getProtoBufMessage(), id1.getProtoBufMessage());
+
+    // Write should avoid duplicate entries
+    File noDupIDFile = new File(WRITE_TMP, "no-dup-values.id");
+    noDupIDFile.delete();
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id2, noDupIDFile);
+    ContainerUtils.writeDatanodeIDTo(id3, noDupIDFile);
+
+    List<DatanodeID> noDupIDs =ContainerUtils
+        .readDatanodeIDsFrom(noDupIDFile);
+    assertEquals(3, noDupIDs.size());
+    assertTrue(noDupIDs.contains(id1));
+    assertTrue(noDupIDs.contains(id2));
+    assertTrue(noDupIDs.contains(id3));
+
+    // Write should fail if unable to create file or directory
+    File invalidPath = new File(WRITE_TMP, "an/invalid/path");
+    try {
+      ContainerUtils.writeDatanodeIDTo(id1, invalidPath);
+    } catch (Exception e) {
+      e.printStackTrace();
+      assertTrue(e instanceof IOException);
+    }
+
+    // Read should return an empty value if file doesn't exist
+    File nonExistFile = new File(READ_TMP, "non_exist.id");
+    nonExistFile.delete();
+    List<DatanodeID> emptyIDs =
+        ContainerUtils.readDatanodeIDsFrom(nonExistFile);
+    assertTrue(emptyIDs.isEmpty());
+
+    // Read should fail if the file is malformed
+    File malformedFile = new File(READ_TMP, "malformed.id");
+    createMalformedIDFile(malformedFile);
+    try {
+      ContainerUtils.readDatanodeIDsFrom(malformedFile);
+      fail("Read a malformed ID file should fail");
+    } catch (Exception e) {
+      assertTrue(e instanceof IOException);
+    }
+  }
+
+  @Test
+  public void testContainerRandomPort() throws IOException {
+    Configuration ozoneConf = SCMTestUtils.getConf();
+    File testDir = PathUtils.getTestDir(TestOzoneContainer.class);
+    ozoneConf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
+
+    // Each instance of SM will create an ozone container
+    // that bounds to a random port.
+    ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
+    try (
+        DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
+    ) {
+      HashSet<Integer> ports = new HashSet<Integer>();
+      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm2.getContainer().getContainerServerPort()));
+      assertTrue(ports.add(sm3.getContainer().getContainerServerPort()));
+    }
+
+    // Turn off the random port flag and test again
+    ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
+    try (
+        DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf);
+        DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf);
+    ) {
+      HashSet<Integer> ports = new HashSet<Integer>();
+      assertTrue(ports.add(sm1.getContainer().getContainerServerPort()));
+      assertFalse(ports.add(sm2.getContainer().getContainerServerPort()));
+      assertFalse(ports.add(sm3.getContainer().getContainerServerPort()));
+      assertEquals(ports.iterator().next().intValue(),
+          conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+              OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT));
+    }
+  }
+
+  private void createMalformedIDFile(File malformedFile)
+      throws IOException{
+    malformedFile.delete();
+    DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1);
+    ContainerUtils.writeDatanodeIDTo(id1, malformedFile);
+
+    FileOutputStream out = new FileOutputStream(malformedFile);
+    out.write("malformed".getBytes());
+    out.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 9498baf..e8a1edc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.container.common;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
@@ -51,6 +54,7 @@ import java.util.concurrent.TimeoutException;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.scm.ScmConfigKeys
     .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the datanode state machine class and its states.
@@ -134,14 +138,18 @@ public class TestDatanodeStateMachine {
    * @throws InterruptedException
    */
   @Test
-  public void testDatanodeStateMachineStartThread() throws IOException,
+  public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        DatanodeStateMachine.initStateMachine(conf)) {
+        new DatanodeStateMachine(conf)) {
+      stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
       GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
           1000, 30000);
+
+      stateMachine.stopDaemon();
+      assertTrue(stateMachine.isDaemonStopped());
     }
   }
 
@@ -178,6 +186,15 @@ public class TestDatanodeStateMachine {
   @Test
   public void testDatanodeStateContext() throws IOException,
       InterruptedException, ExecutionException, TimeoutException {
+    // There is no mini cluster started in this test,
+    // create a ID file so that state machine could load a fake datanode ID.
+    File idPath = new File(
+        conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID));
+    idPath.delete();
+    DatanodeID dnID = DFSTestUtil.getLocalDatanodeID();
+    dnID.setContainerPort(ScmConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    ContainerUtils.writeDatanodeIDTo(dnID, idPath);
+
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f2c18/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 11d57ce..1a40b34 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -57,6 +57,7 @@ public class TestOzoneContainer {
     MiniOzoneCluster cluster = null;
     try {
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
       // We don't start Ozone Container via data node, we will do it
       // independently in our test path.
@@ -108,6 +109,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -208,6 +210,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -273,6 +276,7 @@ public class TestOzoneContainer {
           pipeline.getLeader().getContainerPort());
 
       cluster = new MiniOzoneCluster.Builder(conf)
+          .setRandomContainerPort(false)
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
@@ -364,5 +368,4 @@ public class TestOzoneContainer {
       }
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message