hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [16/57] [abbrv] hadoop git commit: HDFS-9850. DiskBalancer: Explore removing references to FsVolumeSpi. Contributed by Manoj Govindassamy.
Date Mon, 03 Oct 2016 22:58:29 GMT
HDFS-9850. DiskBalancer: Explore removing references to FsVolumeSpi. Contributed by Manoj Govindassamy.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/03f519a7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/03f519a7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/03f519a7

Branch: refs/heads/HDFS-10467
Commit: 03f519a757ce83d76e7fc9f6aadf271e38bb9f6d
Parents: 6437ba1
Author: Anu Engineer <aengineer@apache.org>
Authored: Tue Sep 27 21:35:06 2016 -0700
Committer: Anu Engineer <aengineer@apache.org>
Committed: Tue Sep 27 21:35:06 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/DiskBalancer.java      | 210 +++++++++++++------
 .../server/diskbalancer/TestDiskBalancer.java   | 156 ++++++++++++++
 2 files changed, 299 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/03f519a7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index d853ae9..e7e9105 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi
+    .FsVolumeReferences;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -41,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.LinkedList;
@@ -192,7 +195,30 @@ public class DiskBalancer {
   }
 
   /**
-   * Returns the Current Work Status of a submitted Plan.
+   * Get FsVolume by volume UUID.
+   * @param fsDataset
+   * @param volUuid
+   * @return FsVolumeSpi
+   */
+  private static FsVolumeSpi getFsVolume(final FsDatasetSpi<?> fsDataset,
+      final String volUuid) {
+    FsVolumeSpi fsVolume = null;
+    try (FsVolumeReferences volumeReferences =
+           fsDataset.getFsVolumeReferences()) {
+      for (int i = 0; i < volumeReferences.size(); i++) {
+        if (volumeReferences.get(i).getStorageID().equals(volUuid)) {
+          fsVolume = volumeReferences.get(i);
+          break;
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Disk Balancer - Error when closing volume references: ", e);
+    }
+    return fsVolume;
+  }
+
+  /**
+   * Returns the current work status of a previously submitted Plan.
    *
    * @return DiskBalancerWorkStatus.
    * @throws DiskBalancerException
@@ -214,8 +240,8 @@ public class DiskBalancer {
       for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
           workMap.entrySet()) {
         DiskBalancerWorkEntry workEntry = new DiskBalancerWorkEntry(
-            entry.getKey().getSource().getBasePath(),
-            entry.getKey().getDest().getBasePath(),
+            entry.getKey().getSourceVolBasePath(),
+            entry.getKey().getDestVolBasePath(),
             entry.getValue());
         status.addWorkEntry(workEntry);
       }
@@ -269,12 +295,7 @@ public class DiskBalancer {
     lock.lock();
     try {
       checkDiskBalancerEnabled();
-      Map<String, String> pathMap = new HashMap<>();
-      Map<String, FsVolumeSpi> volMap = getStorageIDToVolumeMap();
-      for (Map.Entry<String, FsVolumeSpi> entry : volMap.entrySet()) {
-        pathMap.put(entry.getKey(), entry.getValue().getBasePath());
-      }
-      return JsonUtil.toJsonString(pathMap);
+      return JsonUtil.toJsonString(getStorageIDToVolumeBasePathMap());
     } catch (DiskBalancerException ex) {
       throw ex;
     } catch (IOException e) {
@@ -434,47 +455,52 @@ public class DiskBalancer {
 
     // Cleanup any residual work in the map.
     workMap.clear();
-    Map<String, FsVolumeSpi> pathMap = getStorageIDToVolumeMap();
+    Map<String, String> storageIDToVolBasePathMap =
+        getStorageIDToVolumeBasePathMap();
 
     for (Step step : plan.getVolumeSetPlans()) {
-      String sourceuuid = step.getSourceVolume().getUuid();
-      String destinationuuid = step.getDestinationVolume().getUuid();
-
-      FsVolumeSpi sourceVol = pathMap.get(sourceuuid);
-      if (sourceVol == null) {
-        LOG.error("Disk Balancer - Unable to find source volume. submitPlan " +
-            "failed.");
-        throw new DiskBalancerException("Unable to find source volume.",
+      String sourceVolUuid = step.getSourceVolume().getUuid();
+      String destVolUuid = step.getDestinationVolume().getUuid();
+
+      String sourceVolBasePath = storageIDToVolBasePathMap.get(sourceVolUuid);
+      if (sourceVolBasePath == null) {
+        final String errMsg = "Disk Balancer - Unable to find volume: "
+            + step.getSourceVolume().getPath() + ". SubmitPlan failed.";
+        LOG.error(errMsg);
+        throw new DiskBalancerException(errMsg,
             DiskBalancerException.Result.INVALID_VOLUME);
       }
 
-      FsVolumeSpi destVol = pathMap.get(destinationuuid);
-      if (destVol == null) {
-        LOG.error("Disk Balancer - Unable to find destination volume. " +
-            "submitPlan failed.");
-        throw new DiskBalancerException("Unable to find destination volume.",
+      String destVolBasePath = storageIDToVolBasePathMap.get(destVolUuid);
+      if (destVolBasePath == null) {
+        final String errMsg = "Disk Balancer - Unable to find volume: "
+            + step.getDestinationVolume().getPath() + ". SubmitPlan failed.";
+        LOG.error(errMsg);
+        throw new DiskBalancerException(errMsg,
             DiskBalancerException.Result.INVALID_VOLUME);
       }
-      createWorkPlan(sourceVol, destVol, step);
+      VolumePair volumePair = new VolumePair(sourceVolUuid,
+          sourceVolBasePath, destVolUuid, destVolBasePath);
+      createWorkPlan(volumePair, step);
     }
   }
 
   /**
-   * Returns a path to Volume Map.
+   * Returns volume UUID to volume base path map.
    *
    * @return Map
    * @throws DiskBalancerException
    */
-  private Map<String, FsVolumeSpi> getStorageIDToVolumeMap()
+  private Map<String, String> getStorageIDToVolumeBasePathMap()
       throws DiskBalancerException {
-    Map<String, FsVolumeSpi> pathMap = new HashMap<>();
+    Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
     FsDatasetSpi.FsVolumeReferences references;
     try {
       try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
         references = this.dataset.getFsVolumeReferences();
         for (int ndx = 0; ndx < references.size(); ndx++) {
           FsVolumeSpi vol = references.get(ndx);
-          pathMap.put(vol.getStorageID(), vol);
+          storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath());
         }
         references.close();
       }
@@ -483,7 +509,7 @@ public class DiskBalancer {
       throw new DiskBalancerException("Internal error", ex,
           DiskBalancerException.Result.INTERNAL_ERROR);
     }
-    return pathMap;
+    return storageIDToVolBasePathMap;
   }
 
   /**
@@ -513,26 +539,24 @@ public class DiskBalancer {
 
   /**
    * Insert work items to work map.
-   *
-   * @param source - Source vol
-   * @param dest   - destination volume
-   * @param step   - Move Step
+   * @param volumePair - VolumePair
+   * @param step - Move Step
    */
-  private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
-                              Step step) throws DiskBalancerException {
-
-    if (source.getStorageID().equals(dest.getStorageID())) {
-      LOG.info("Disk Balancer - source & destination volumes are same.");
-      throw new DiskBalancerException("source and destination volumes are " +
-          "same.", DiskBalancerException.Result.INVALID_MOVE);
+  private void createWorkPlan(final VolumePair volumePair, Step step)
+      throws DiskBalancerException {
+    if (volumePair.getSourceVolUuid().equals(volumePair.getDestVolUuid())) {
+      final String errMsg = "Disk Balancer - Source and destination volumes " +
+          "are same: " + volumePair.getSourceVolUuid();
+      LOG.warn(errMsg);
+      throw new DiskBalancerException(errMsg,
+          DiskBalancerException.Result.INVALID_MOVE);
     }
-    VolumePair pair = new VolumePair(source, dest);
     long bytesToMove = step.getBytesToMove();
     // In case we have a plan with more than
-    // one line of same <source, dest>
+    // one line of same VolumePair
     // we compress that into one work order.
-    if (workMap.containsKey(pair)) {
-      bytesToMove += workMap.get(pair).getBytesToCopy();
+    if (workMap.containsKey(volumePair)) {
+      bytesToMove += workMap.get(volumePair).getBytesToCopy();
     }
 
     DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
@@ -542,7 +566,7 @@ public class DiskBalancer {
     work.setBandwidth(step.getBandwidth());
     work.setTolerancePercent(step.getTolerancePercent());
     work.setMaxDiskErrors(step.getMaxDiskErrors());
-    workMap.put(pair, work);
+    workMap.put(volumePair, work);
   }
 
   /**
@@ -591,39 +615,63 @@ public class DiskBalancer {
   }
 
   /**
-   * Holds references to actual volumes that we will be operating against.
+   * Holds source and dest volumes UUIDs and their BasePaths
+   * that disk balancer will be operating against.
    */
   public static class VolumePair {
-    private final FsVolumeSpi source;
-    private final FsVolumeSpi dest;
+    private final String sourceVolUuid;
+    private final String destVolUuid;
+    private final String sourceVolBasePath;
+    private final String destVolBasePath;
 
     /**
      * Constructs a volume pair.
+     * @param sourceVolUuid     - Source Volume
+     * @param sourceVolBasePath - Source Volume Base Path
+     * @param destVolUuid       - Destination Volume
+     * @param destVolBasePath   - Destination Volume Base Path
+     */
+    public VolumePair(final String sourceVolUuid,
+        final String sourceVolBasePath, final String destVolUuid,
+        final String destVolBasePath) {
+      this.sourceVolUuid = sourceVolUuid;
+      this.sourceVolBasePath = sourceVolBasePath;
+      this.destVolUuid = destVolUuid;
+      this.destVolBasePath = destVolBasePath;
+    }
+
+    /**
+     * Gets source volume UUID.
      *
-     * @param source - Source Volume
-     * @param dest   - Destination Volume
+     * @return UUID String
      */
-    public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) {
-      this.source = source;
-      this.dest = dest;
+    public String getSourceVolUuid() {
+      return sourceVolUuid;
     }
 
     /**
-     * gets source volume.
+     * Gets source volume base path.
+     * @return String
+     */
+    public String getSourceVolBasePath() {
+      return sourceVolBasePath;
+    }
+    /**
+     * Gets destination volume UUID.
      *
-     * @return volume
+     * @return UUID String
      */
-    public FsVolumeSpi getSource() {
-      return source;
+    public String getDestVolUuid() {
+      return destVolUuid;
     }
 
     /**
-     * Gets Destination volume.
+     * Gets desitnation volume base path.
      *
-     * @return volume.
+     * @return String
      */
-    public FsVolumeSpi getDest() {
-      return dest;
+    public String getDestVolBasePath() {
+      return destVolBasePath;
     }
 
     @Override
@@ -636,13 +684,21 @@ public class DiskBalancer {
       }
 
       VolumePair that = (VolumePair) o;
-      return source.equals(that.source) && dest.equals(that.dest);
+      return sourceVolUuid.equals(that.sourceVolUuid)
+          && sourceVolBasePath.equals(that.sourceVolBasePath)
+          && destVolUuid.equals(that.destVolUuid)
+          && destVolBasePath.equals(that.destVolBasePath);
     }
 
     @Override
     public int hashCode() {
-      int result = source.getBasePath().hashCode();
-      result = 31 * result + dest.getBasePath().hashCode();
+      final int primeNum = 31;
+      final List<String> volumeStrList = Arrays.asList(sourceVolUuid,
+          sourceVolBasePath, destVolUuid, destVolBasePath);
+      int result = 1;
+      for (String str : volumeStrList) {
+        result = (result * primeNum) + str.hashCode();
+      }
       return result;
     }
   }
@@ -932,8 +988,28 @@ public class DiskBalancer {
      */
     @Override
     public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
-      FsVolumeSpi source = pair.getSource();
-      FsVolumeSpi dest = pair.getDest();
+      String sourceVolUuid = pair.getSourceVolUuid();
+      String destVolUuuid = pair.getDestVolUuid();
+
+      // When any of the DiskBalancerWorkItem volumes are not
+      // available, return after setting error in item.
+      FsVolumeSpi source = getFsVolume(this.dataset, sourceVolUuid);
+      if (source == null) {
+        final String errMsg = "Disk Balancer - Unable to find source volume: "
+            + pair.getDestVolBasePath();
+        LOG.error(errMsg);
+        item.setErrMsg(errMsg);
+        return;
+      }
+      FsVolumeSpi dest = getFsVolume(this.dataset, destVolUuuid);
+      if (dest == null) {
+        final String errMsg = "Disk Balancer - Unable to find dest volume: "
+            + pair.getDestVolBasePath();
+        LOG.error(errMsg);
+        item.setErrMsg(errMsg);
+        return;
+      }
+
       List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
       startTime = Time.now();
       item.setStartTime(startTime);
@@ -977,7 +1053,7 @@ public class DiskBalancer {
             // we are not able to find any blocks to copy.
             if (block == null) {
               LOG.error("No source blocks, exiting the copy. Source: {}, " +
-                  "dest:{}", source.getBasePath(), dest.getBasePath());
+                  "Dest:{}", source.getBasePath(), dest.getBasePath());
               this.setExitFlag();
               continue;
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03f519a7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
index eb15bdc..d911e74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -27,9 +28,15 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.DiskBalancerMover;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancer.VolumePair;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
 import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
@@ -41,18 +48,30 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
 import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 
 /**
  * Test Disk Balancer.
@@ -186,6 +205,47 @@ public class TestDiskBalancer {
   }
 
   /**
+   * Test disk balancer behavior when one of the disks involved
+   * in balancing operation is removed after submitting the plan.
+   * @throws Exception
+   */
+  @Test
+  public void testDiskBalancerWhenRemovingVolumes() throws Exception {
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+
+    final int blockCount = 100;
+    final int blockSize = 1024;
+    final int diskCount = 2;
+    final int dataNodeCount = 1;
+    final int dataNodeIndex = 0;
+    final int sourceDiskIndex = 0;
+
+    MiniDFSCluster cluster = new ClusterBuilder()
+        .setBlockCount(blockCount)
+        .setBlockSize(blockSize)
+        .setDiskCount(diskCount)
+        .setNumDatanodes(dataNodeCount)
+        .setConf(conf)
+        .build();
+
+    try {
+      DataMover dataMover = new DataMover(cluster, dataNodeIndex,
+          sourceDiskIndex, conf, blockSize, blockCount);
+      dataMover.moveDataToSourceDisk();
+      NodePlan plan = dataMover.generatePlan();
+      dataMover.executePlanDuringDiskRemove(plan);
+      dataMover.verifyAllVolumesHaveData();
+      dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10);
+    } catch (Exception e) {
+      Assert.fail("Unexpected exception: " + e);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
    * Sets alll Disks capacity to size specified.
    *
    * @param cluster - DiskBalancerCluster
@@ -446,6 +506,102 @@ public class TestDiskBalancer {
       }, 1000, 100000);
     }
 
+    public void executePlanDuringDiskRemove(NodePlan plan) throws
+        IOException, TimeoutException, InterruptedException {
+      CountDownLatch createWorkPlanLatch = new CountDownLatch(1);
+      CountDownLatch removeDiskLatch = new CountDownLatch(1);
+      AtomicInteger errorCount = new AtomicInteger(0);
+
+      LOG.info("FSDataSet: " + node.getFSDataset());
+      final FsDatasetSpi<?> fsDatasetSpy = Mockito.spy(node.getFSDataset());
+      doAnswer(new Answer<Object>() {
+          public Object answer(InvocationOnMock invocation) {
+            try {
+              node.getFSDataset().moveBlockAcrossVolumes(
+                  (ExtendedBlock)invocation.getArguments()[0],
+                  (FsVolumeSpi) invocation.getArguments()[1]);
+            } catch (Exception e) {
+              errorCount.incrementAndGet();
+            }
+            return null;
+          }
+        }).when(fsDatasetSpy).moveBlockAcrossVolumes(
+            any(ExtendedBlock.class), any(FsVolumeSpi.class));
+
+      DiskBalancerMover diskBalancerMover = new DiskBalancerMover(
+          fsDatasetSpy, conf);
+      diskBalancerMover.setRunnable();
+
+      DiskBalancerMover diskBalancerMoverSpy = Mockito.spy(diskBalancerMover);
+      doAnswer(new Answer<Object>() {
+          public Object answer(InvocationOnMock invocation) {
+            createWorkPlanLatch.countDown();
+            LOG.info("Waiting for the disk removal!");
+            try {
+              removeDiskLatch.await();
+            } catch (InterruptedException e) {
+              LOG.info("Encountered " + e);
+            }
+            LOG.info("Got disk removal notification, resuming copyBlocks!");
+            diskBalancerMover.copyBlocks((VolumePair)(invocation
+                .getArguments()[0]), (DiskBalancerWorkItem)(invocation
+                .getArguments()[1]));
+            return null;
+          }
+        }).when(diskBalancerMoverSpy).copyBlocks(
+            any(VolumePair.class), any(DiskBalancerWorkItem.class));
+
+      DiskBalancer diskBalancer = new DiskBalancer(node.getDatanodeUuid(),
+          conf, diskBalancerMoverSpy);
+
+      List<String> oldDirs = new ArrayList<String>(node.getConf().
+          getTrimmedStringCollection(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+      final String newDirs = oldDirs.get(0);
+      LOG.info("Reconfigure newDirs:" + newDirs);
+      Thread reconfigThread = new Thread() {
+        public void run() {
+          try {
+            LOG.info("Waiting for work plan creation!");
+            createWorkPlanLatch.await();
+            LOG.info("Work plan created. Removing disk!");
+            assertThat(
+                "DN did not update its own config", node.
+                reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs),
+                is(node.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
+            Thread.sleep(1000);
+            LOG.info("Removed disk!");
+            removeDiskLatch.countDown();
+          } catch (ReconfigurationException | InterruptedException e) {
+            Assert.fail("Unexpected error while reconfiguring: " + e);
+          }
+        }
+      };
+      reconfigThread.start();
+
+      String planJson = plan.toJson();
+      String planID = DigestUtils.shaHex(planJson);
+      diskBalancer.submitPlan(planID, 1, PLAN_FILE, planJson, false);
+
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            LOG.info("Work Status: " + diskBalancer.
+                queryWorkStatus().toJsonString());
+            Result result = diskBalancer.queryWorkStatus().getResult();
+            return (result == Result.PLAN_DONE);
+          } catch (IOException e) {
+            return false;
+          }
+        }
+      }, 1000, 100000);
+
+      assertTrue("Disk balancer operation hit max errors!", errorCount.get() <
+          DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
+      createWorkPlanLatch.await();
+      removeDiskLatch.await();
+    }
+
     /**
      * Verifies the Plan Execution has been done.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message