hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viraj...@apache.org
Subject [49/50] [abbrv] hadoop git commit: HDFS-10675. Datanode support to read from external stores.
Date Sat, 15 Apr 2017 19:11:10 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index adec209..15e71f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
 import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
@@ -241,10 +242,11 @@ public interface FsVolumeSpi
 
     private final FsVolumeSpi volume;
 
+    private final FileRegion fileRegion;
     /**
      * Get the file's length in async block scan
      */
-    private final long blockFileLength;
+    private final long blockLength;
 
     private final static Pattern CONDENSED_PATH_REGEX =
         Pattern.compile("(?<!^)(\\\\|/){2,}");
@@ -294,13 +296,30 @@ public interface FsVolumeSpi
      */
     public ScanInfo(long blockId, File blockFile, File metaFile,
         FsVolumeSpi vol) {
+      this(blockId, blockFile, metaFile, vol, null,
+          (blockFile != null) ? blockFile.length() : 0);
+    }
+
+    /**
+     * Create a ScanInfo object for a block. This constructor will examine
+     * the block data and meta-data files.
+     *
+     * @param blockId the block ID
+     * @param blockFile the path to the block data file
+     * @param metaFile the path to the block meta-data file
+     * @param vol the volume that contains the block
+     * @param fileRegion the file region (for provided blocks)
+     * @param length the length of the block data
+     */
+    public ScanInfo(long blockId, File blockFile, File metaFile,
+        FsVolumeSpi vol, FileRegion fileRegion, long length) {
       this.blockId = blockId;
       String condensedVolPath =
           (vol == null || vol.getBaseURI() == null) ? null :
             getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
       this.blockSuffix = blockFile == null ? null :
         getSuffix(blockFile, condensedVolPath);
-      this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
+      this.blockLength = length;
       if (metaFile == null) {
         this.metaSuffix = null;
       } else if (blockFile == null) {
@@ -310,6 +329,7 @@ public interface FsVolumeSpi
             condensedVolPath + blockSuffix);
       }
       this.volume = vol;
+      this.fileRegion = fileRegion;
     }
 
     /**
@@ -328,8 +348,8 @@ public interface FsVolumeSpi
      *
      * @return the length of the data block
      */
-    public long getBlockFileLength() {
-      return blockFileLength;
+    public long getBlockLength() {
+      return blockLength;
     }
 
     /**
@@ -399,6 +419,10 @@ public interface FsVolumeSpi
           getMetaFile().getName()) :
             HdfsConstants.GRANDFATHER_GENERATION_STAMP;
     }
+
+    public FileRegion getFileRegion() {
+      return fileRegion;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
new file mode 100644
index 0000000..24921c4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/DefaultProvidedVolumeDF.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The default usage statistics for a provided volume.
+ */
+public class DefaultProvidedVolumeDF
+    implements ProvidedVolumeDF, Configurable {
+
+  @Override
+  public void setConf(Configuration conf) {
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public long getCapacity() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long getSpaceUsed() {
+    return 0;
+  }
+
+  @Override
+  public long getBlockPoolUsed(String bpid) {
+    return 0;
+  }
+
+  @Override
+  public long getAvailable() {
+    return Long.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 169e0e6..e208923 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -1699,6 +1700,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       Set<String> missingVolumesReported = new HashSet<>();
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        //skip blocks in PROVIDED storage
+        if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
+          continue;
+        }
         String volStorageID = b.getVolume().getStorageID();
         if (!builders.containsKey(volStorageID)) {
           if (!missingVolumesReported.contains(volStorageID)) {
@@ -1834,7 +1839,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       r = volumeMap.get(bpid, blockId);
     }
-
     if (r != null) {
       if (r.blockDataExists()) {
         return r;
@@ -2175,13 +2179,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @param vol Volume of the block file
    */
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException {
+  public void checkAndUpdate(String bpid, ScanInfo scanInfo)
+      throws IOException {
+
+    long blockId = scanInfo.getBlockId();
+    File diskFile = scanInfo.getBlockFile();
+    File diskMetaFile = scanInfo.getMetaFile();
+    FsVolumeSpi vol = scanInfo.getVolume();
+
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       memBlockInfo = volumeMap.get(bpid, blockId);
-      if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
+      if (memBlockInfo != null &&
+          memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
       }
@@ -2196,6 +2207,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           Block.getGenerationStamp(diskMetaFile.getName()) :
           HdfsConstants.GRANDFATHER_GENERATION_STAMP;
 
+      if (vol.getStorageType() == StorageType.PROVIDED) {
+        if (memBlockInfo == null) {
+          //replica exists on provided store but not in memory
+          ReplicaInfo diskBlockInfo =
+              new ReplicaBuilder(ReplicaState.FINALIZED)
+              .setFileRegion(scanInfo.getFileRegion())
+              .setFsVolume(vol)
+              .setConf(conf)
+              .build();
+
+          volumeMap.add(bpid, diskBlockInfo);
+          LOG.warn("Added missing block to memory " + diskBlockInfo);
+        } else {
+          //replica exists in memory but not in the provided store
+          volumeMap.remove(bpid, blockId);
+          LOG.warn("Deleting missing provided block " + memBlockInfo);
+        }
+        return;
+      }
+
       if (!diskFileExists) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -2967,7 +2998,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           newReplicaInfo =
               replicaState.getLazyPersistVolume().activateSavedReplica(bpid,
                   replicaInfo, replicaState);
-
           // Update the volumeMap entry.
           volumeMap.add(bpid, newReplicaInfo);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 32759c4..9f115a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileInputStream;
@@ -32,10 +34,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -44,6 +48,22 @@ public class FsDatasetUtil {
     return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
   }
 
+  public static byte[] createNullChecksumByteArray() {
+    DataChecksum csum =
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512);
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    DataOutputStream dataOut = new DataOutputStream(out);
+    try {
+      BlockMetadataHeader.writeHeader(dataOut, csum);
+      dataOut.close();
+    } catch (IOException e) {
+      FsVolumeImpl.LOG.error(
+          "Exception in creating null checksum stream: " + e);
+      return null;
+    }
+    return out.toByteArray();
+  }
+
   static File getOrigFile(File unlinkTmpFile) {
     final String name = unlinkTmpFile.getName();
     if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
@@ -135,8 +155,9 @@ public class FsDatasetUtil {
    * Compute the checksum for a block file that does not already have
    * its checksum computed, and save it to dstMeta file.
    */
-  public static void computeChecksum(File srcMeta, File dstMeta, File blockFile,
-      int smallBufferSize, Configuration conf) throws IOException {
+  public static void computeChecksum(File srcMeta, File dstMeta,
+      File blockFile, int smallBufferSize, Configuration conf)
+          throws IOException {
     Preconditions.checkNotNull(srcMeta);
     Preconditions.checkNotNull(dstMeta);
     Preconditions.checkNotNull(blockFile);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index b948fb7..267a5cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -155,18 +155,24 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.reservedForReplicas = new AtomicLong(0L);
     this.storageLocation = sd.getStorageLocation();
     this.currentDir = sd.getCurrentDir();
-    File parent = currentDir.getParentFile();
-    this.usage = new DF(parent, conf);
     this.storageType = storageLocation.getStorageType();
     this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY
         + "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong(
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT));
     this.configuredCapacity = -1;
+    if (currentDir != null) {
+      File parent = currentDir.getParentFile();
+      this.usage = new DF(parent, conf);
+      cacheExecutor = initializeCacheExecutor(parent);
+      this.metrics = DataNodeVolumeMetrics.create(conf, parent.getPath());
+    } else {
+      this.usage = null;
+      cacheExecutor = null;
+      this.metrics = null;
+    }
     this.conf = conf;
     this.fileIoProvider = fileIoProvider;
-    cacheExecutor = initializeCacheExecutor(parent);
-    this.metrics = DataNodeVolumeMetrics.create(conf, getBaseURI().getPath());
   }
 
   protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
@@ -446,7 +452,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
   /**
    * Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
    *
-   * @return
+   * @return Disk usage excluding space used by HDFS and excluding space
+   * reserved for blocks open for write.
    * @throws IOException
    */
   public long getNonDfsUsed() throws IOException {
@@ -524,7 +531,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   public String[] getBlockPoolList() {
     return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);   
   }
-    
+
   /**
    * Temporary files. They get moved to the finalized block directory when
    * the block is finalized.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
index 427f81b..2da9170 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
 
@@ -67,6 +68,11 @@ public class FsVolumeImplBuilder {
   }
 
   FsVolumeImpl build() throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      return new ProvidedVolumeImpl(dataset, storageID, sd,
+          fileIoProvider != null ? fileIoProvider :
+            new FileIoProvider(null, null), conf);
+    }
     return new FsVolumeImpl(
         dataset, storageID, sd,
         fileIoProvider != null ? fileIoProvider :

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
new file mode 100644
index 0000000..4d28883
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeDF.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+/**
+ * This interface is used to define the usage statistics
+ * of the provided storage.
+ */
+public interface ProvidedVolumeDF {
+
+  long getCapacity();
+
+  long getSpaceUsed();
+
+  long getBlockPoolUsed(String bpid);
+
+  long getAvailable();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
new file mode 100644
index 0000000..a48e117
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ProvidedVolumeImpl.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
+import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.util.Timer;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+
+/**
+ * This class is used to create provided volumes.
+ */
+public class ProvidedVolumeImpl extends FsVolumeImpl {
+
+  static class ProvidedBlockPoolSlice {
+    private FsVolumeImpl providedVolume;
+
+    private FileRegionProvider provider;
+    private Configuration conf;
+    private String bpid;
+    private ReplicaMap bpVolumeMap;
+
+    ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
+        Configuration conf) {
+      this.providedVolume = volume;
+      bpVolumeMap = new ReplicaMap(new AutoCloseableLock());
+      Class<? extends FileRegionProvider> fmt =
+          conf.getClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+              TextFileRegionProvider.class, FileRegionProvider.class);
+      provider = ReflectionUtils.newInstance(fmt, conf);
+      this.conf = conf;
+      this.bpid = bpid;
+      bpVolumeMap.initBlockPool(bpid);
+      LOG.info("Created provider: " + provider.getClass());
+    }
+
+    FileRegionProvider getFileRegionProvider() {
+      return provider;
+    }
+
+    public void getVolumeMap(ReplicaMap volumeMap,
+        RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
+      Iterator<FileRegion> iter = provider.iterator();
+      while(iter.hasNext()) {
+        FileRegion region = iter.next();
+        if (region.getBlockPoolId() != null &&
+            region.getBlockPoolId().equals(bpid)) {
+          ReplicaInfo newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+              .setBlockId(region.getBlock().getBlockId())
+              .setURI(region.getPath().toUri())
+              .setOffset(region.getOffset())
+              .setLength(region.getBlock().getNumBytes())
+              .setGenerationStamp(region.getBlock().getGenerationStamp())
+              .setFsVolume(providedVolume)
+              .setConf(conf).build();
+
+          ReplicaInfo oldReplica =
+              volumeMap.get(bpid, newReplica.getBlockId());
+          if (oldReplica == null) {
+            volumeMap.add(bpid, newReplica);
+            bpVolumeMap.add(bpid, newReplica);
+          } else {
+            throw new IOException(
+                "A block with id " + newReplica.getBlockId() +
+                " already exists in the volumeMap");
+          }
+        }
+      }
+    }
+
+    public boolean isEmpty() {
+      return bpVolumeMap.replicas(bpid).size() == 0;
+    }
+
+    public void shutdown(BlockListAsLongs blocksListsAsLongs) {
+      //nothing to do!
+    }
+
+    public void compileReport(LinkedList<ScanInfo> report,
+        ReportCompiler reportCompiler)
+            throws IOException, InterruptedException {
+      /* refresh the provider and return the list of blocks found.
+       * the assumption here is that the block ids in the external
+       * block map, after the refresh, are consistent with those
+       * from before the refresh, i.e., for blocks which did not change,
+       * the ids remain the same.
+       */
+      provider.refresh();
+      Iterator<FileRegion> iter = provider.iterator();
+      while(iter.hasNext()) {
+        reportCompiler.throttle();
+        FileRegion region = iter.next();
+        if (region.getBlockPoolId().equals(bpid)) {
+          LOG.info("Adding ScanInfo for blkid " +
+              region.getBlock().getBlockId());
+          report.add(new ScanInfo(region.getBlock().getBlockId(), null, null,
+              providedVolume, region, region.getLength()));
+        }
+      }
+    }
+  }
+
+  private URI baseURI;
+  private final Map<String, ProvidedBlockPoolSlice> bpSlices =
+      new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
+
+  private ProvidedVolumeDF df;
+
+  ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
+      StorageDirectory sd, FileIoProvider fileIoProvider,
+      Configuration conf) throws IOException {
+    super(dataset, storageID, sd, fileIoProvider, conf);
+    assert getStorageLocation().getStorageType() == StorageType.PROVIDED:
+      "Only provided storages must use ProvidedVolume";
+
+    baseURI = getStorageLocation().getUri();
+    Class<? extends ProvidedVolumeDF> dfClass =
+        conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
+            DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
+    df = ReflectionUtils.newInstance(dfClass, conf);
+  }
+
+  @Override
+  public String[] getBlockPoolList() {
+    return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
+  }
+
+  @Override
+  public long getCapacity() {
+    if (configuredCapacity < 0) {
+      return df.getCapacity();
+    }
+    return configuredCapacity;
+  }
+
+  @Override
+  public long getDfsUsed() throws IOException {
+    return df.getSpaceUsed();
+  }
+
+  @Override
+  long getBlockPoolUsed(String bpid) throws IOException {
+    return df.getBlockPoolUsed(bpid);
+  }
+
+  @Override
+  public long getAvailable() throws IOException {
+    return df.getAvailable();
+  }
+
+  @Override
+  long getActualNonDfsUsed() throws IOException {
+    return df.getSpaceUsed();
+  }
+
+  @Override
+  public long getNonDfsUsed() throws IOException {
+    return 0L;
+  }
+
+  @Override
+  public URI getBaseURI() {
+    return baseURI;
+  }
+
+  @Override
+  public File getFinalizedDir(String bpid) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void reserveSpaceForReplica(long bytesToReserve) {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public void releaseReservedSpace(long bytesToRelease) {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  private static final ObjectWriter WRITER =
+      new ObjectMapper().writerWithDefaultPrettyPrinter();
+  private static final ObjectReader READER =
+      new ObjectMapper().reader(ProvidedBlockIteratorState.class);
+
+  private static class ProvidedBlockIteratorState {
+    ProvidedBlockIteratorState() {
+      iterStartMs = Time.now();
+      lastSavedMs = iterStartMs;
+      atEnd = false;
+      lastBlockId = -1;
+    }
+
+    // The wall-clock ms since the epoch at which this iterator was last saved.
+    @JsonProperty
+    private long lastSavedMs;
+
+    // The wall-clock ms since the epoch at which this iterator was created.
+    @JsonProperty
+    private long iterStartMs;
+
+    @JsonProperty
+    private boolean atEnd;
+
+    //The id of the last block read when the state of the iterator is saved.
+    //This implementation assumes that provided blocks are returned
+    //in sorted order of the block ids.
+    @JsonProperty
+    private long lastBlockId;
+  }
+
+  private class ProviderBlockIteratorImpl
+      implements FsVolumeSpi.BlockIterator {
+
+    private String bpid;
+    private String name;
+    private FileRegionProvider provider;
+    private Iterator<FileRegion> blockIterator;
+    private ProvidedBlockIteratorState state;
+
+    ProviderBlockIteratorImpl(String bpid, String name,
+        FileRegionProvider provider) {
+      this.bpid = bpid;
+      this.name = name;
+      this.provider = provider;
+      rewind();
+    }
+
+    @Override
+    public void close() throws IOException {
+      //No action needed
+    }
+
+    @Override
+    public ExtendedBlock nextBlock() throws IOException {
+      if (null == blockIterator || !blockIterator.hasNext()) {
+        return null;
+      }
+      FileRegion nextRegion = null;
+      while (null == nextRegion && blockIterator.hasNext()) {
+        FileRegion temp = blockIterator.next();
+        if (temp.getBlock().getBlockId() < state.lastBlockId) {
+          continue;
+        }
+        if (temp.getBlockPoolId().equals(bpid)) {
+          nextRegion = temp;
+        }
+      }
+      if (null == nextRegion) {
+        return null;
+      }
+      state.lastBlockId = nextRegion.getBlock().getBlockId();
+      return new ExtendedBlock(bpid, nextRegion.getBlock());
+    }
+
+    @Override
+    public boolean atEnd() {
+      return blockIterator != null ? !blockIterator.hasNext(): true;
+    }
+
+    @Override
+    public void rewind() {
+      blockIterator = provider.iterator();
+      state = new ProvidedBlockIteratorState();
+    }
+
+    @Override
+    public void save() throws IOException {
+      //We do not persist the state of this iterator anywhere, locally.
+      //We just re-scan provided volumes as necessary.
+      state.lastSavedMs = Time.now();
+    }
+
+    @Override
+    public void setMaxStalenessMs(long maxStalenessMs) {
+      //do not use max staleness
+    }
+
+    @Override
+    public long getIterStartMs() {
+      return state.iterStartMs;
+    }
+
+    @Override
+    public long getLastSavedMs() {
+      return state.lastSavedMs;
+    }
+
+    @Override
+    public String getBlockPoolId() {
+      return bpid;
+    }
+
+    public void load() throws IOException {
+      //on load, we just rewind the iterator for provided volumes.
+      rewind();
+      LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
+          bpid, name, WRITER.writeValueAsString(state));
+    }
+  }
+
+  @Override
+  public BlockIterator newBlockIterator(String bpid, String name) {
+    return new ProviderBlockIteratorImpl(bpid, name,
+        bpSlices.get(bpid).getFileRegionProvider());
+  }
+
+  @Override
+  public BlockIterator loadBlockIterator(String bpid, String name)
+      throws IOException {
+    ProviderBlockIteratorImpl iter = new ProviderBlockIteratorImpl(bpid, name,
+        bpSlices.get(bpid).getFileRegionProvider());
+    iter.load();
+    return iter;
+  }
+
+  @Override
+  ReplicaInfo addFinalizedBlock(String bpid, Block b,
+      ReplicaInfo replicaInfo, long bytesReserved) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public VolumeCheckResult check(VolumeCheckContext ignored)
+      throws DiskErrorException {
+    return VolumeCheckResult.HEALTHY;
+  }
+
+  @Override
+  void getVolumeMap(ReplicaMap volumeMap,
+      final RamDiskReplicaTracker ramDiskReplicaMap)
+          throws IOException {
+    LOG.info("Creating volumemap for provided volume " + this);
+    for(ProvidedBlockPoolSlice s : bpSlices.values()) {
+      s.getVolumeMap(volumeMap, ramDiskReplicaMap);
+    }
+  }
+
+  private ProvidedBlockPoolSlice getProvidedBlockPoolSlice(String bpid)
+      throws IOException {
+    ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp == null) {
+      throw new IOException("block pool " + bpid + " is not found");
+    }
+    return bp;
+  }
+
+  @Override
+  void getVolumeMap(String bpid, ReplicaMap volumeMap,
+      final RamDiskReplicaTracker ramDiskReplicaMap)
+          throws IOException {
+    getProvidedBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
+  }
+
+  @VisibleForTesting
+  FileRegionProvider getFileRegionProvider(String bpid) throws IOException {
+    return getProvidedBlockPoolSlice(bpid).getFileRegionProvider();
+  }
+
+  @Override
+  public String toString() {
+    return this.baseURI.toString();
+  }
+
+  @Override
+  void addBlockPool(String bpid, Configuration conf) throws IOException {
+    addBlockPool(bpid, conf, null);
+  }
+
+  @Override
+  void addBlockPool(String bpid, Configuration conf, Timer timer)
+      throws IOException {
+    LOG.info("Adding block pool " + bpid +
+        " to volume with id " + getStorageID());
+    ProvidedBlockPoolSlice bp;
+    bp = new ProvidedBlockPoolSlice(bpid, this, conf);
+    bpSlices.put(bpid, bp);
+  }
+
+  void shutdown() {
+    if (cacheExecutor != null) {
+      cacheExecutor.shutdown();
+    }
+    Set<Entry<String, ProvidedBlockPoolSlice>> set = bpSlices.entrySet();
+    for (Entry<String, ProvidedBlockPoolSlice> entry : set) {
+      entry.getValue().shutdown(null);
+    }
+  }
+
+  @Override
+  void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
+    ProvidedBlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp != null) {
+      bp.shutdown(blocksListsAsLongs);
+    }
+    bpSlices.remove(bpid);
+  }
+
+  @Override
+  boolean isBPDirEmpty(String bpid) throws IOException {
+    return getProvidedBlockPoolSlice(bpid).isEmpty();
+  }
+
+  @Override
+  void deleteBPDirectories(String bpid, boolean force) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public LinkedList<ScanInfo> compileReport(String bpid,
+      LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
+          throws InterruptedException, IOException {
+    LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
+    //get the report from the appropriate block pool.
+    if(bpSlices.containsKey(bpid)) {
+      bpSlices.get(bpid).compileReport(report, reportCompiler);
+    }
+    return report;
+  }
+
+  @Override
+  public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
+      long newGS, long estimateBlockLen) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
+      ReplicaInfo temp) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline createTemporary(ExtendedBlock b)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
+      String bpid, long newBlockId, long recoveryId, long newlength)
+          throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block,
+      ReplicaInfo replicaInfo, int smallBufferSize,
+      Configuration conf) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+
+  @Override
+  public File[] copyBlockToLazyPersistLocation(String bpId, long blockId,
+      long genStamp, ReplicaInfo replicaInfo, int smallBufferSize,
+      Configuration conf) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedVolume does not yet support writes");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 7eac87d..24ef80c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -685,7 +685,7 @@ public class Mover {
     }
   }
 
-  static class Cli extends Configured implements Tool {
+  public static class Cli extends Configured implements Tool {
     private static final String USAGE = "Usage: hdfs mover "
         + "[-p <files/dirs> | -f <local file>]"
         + "\n\t-p <files/dirs>\ta space separated list of HDFS files/dirs to migrate."

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
index 872ee74..45e001d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class FSImageCompression {
+public class FSImageCompression {
 
   /** Codec to use to save or load image, or null if the image is not compressed */
   private CompressionCodec imageCodec;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 63d1a28..4aae7d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -658,6 +658,10 @@ public class NNStorage extends Storage implements Closeable,
   void readProperties(StorageDirectory sd, StartupOption startupOption)
       throws IOException {
     Properties props = readPropertiesFile(sd.getVersionFile());
+    if (props == null) {
+      throw new IOException(
+          "Properties not found  for storage directory " + sd);
+    }
     if (HdfsServerConstants.RollingUpgradeStartupOption.ROLLBACK
         .matches(startupOption)) {
       int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
@@ -975,7 +979,11 @@ public class NNStorage extends Storage implements Closeable,
       StorageDirectory sd = sdit.next();
       try {
         Properties props = readPropertiesFile(sd.getVersionFile());
-        cid = props.getProperty("clusterID");
+        if (props == null) {
+          cid = null;
+        } else {
+          cid = props.getProperty("clusterID");
+        }
         LOG.info("current cluster id for sd="+sd.getCurrentDir() + 
             ";lv=" + layoutVersion + ";cid=" + cid);
         

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2b14951..eb2220e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4429,6 +4429,84 @@
   </property>
 
   <property>
+    <name>dfs.provider.class</name>
+    <value>org.apache.hadoop.hdfs.server.common.TextFileRegionProvider</value>
+    <description>
+        The class that is used to load information about blocks stored in
+        provided storages.
+        org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TextFileRegionProvider
+        is used as the default, which expects the blocks to be specified
+        using a delimited text file.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.df.class</name>
+    <value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
+    <description>
+        The class that is used to measure usage statistics of provided stores.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.storage.id</name>
+    <value>DS-PROVIDED</value>
+    <description>
+        The storage ID used for provided stores.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.blockformat.class</name>
+    <value>org.apache.hadoop.hdfs.server.common.TextFileRegionFormat</value>
+    <description>
+      The class that is used to specify the input format of the blocks on
+      provided storages. The default is
+      org.apache.hadoop.hdfs.server.common.TextFileRegionFormat which uses
+      file regions to describe blocks. The file regions are specified as a
+      delimited text file. Each file region is a 6-tuple containing the
+      block id, remote file path, offset into file, length of block, the
+      block pool id containing the block, and the generation stamp of the
+      block.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.delimiter</name>
+    <value>,</value>
+    <description>
+        The delimiter used when the provided block map is specified as
+        a text file.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.read.path</name>
+    <value></value>
+    <description>
+        The path specifying the provided block map as a text file, specified as
+        a URI.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.read.codec</name>
+    <value></value>
+    <description>
+        The codec used to de-compress the provided block map.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.provided.textprovider.write.path</name>
+    <value></value>
+    <description>
+        The path to which the provided block map should be written as a text
+        file, specified as a URI.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.lock.suppress.warning.interval</name>
     <value>10s</value>
     <description>Instrumentation reporting long critical sections will suppress

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
index 25eb5b6..8bc8b0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
@@ -208,7 +208,7 @@ public class TestDFSRollback {
       UpgradeUtilities.createDataNodeVersionFile(
           dataCurrentDirs,
           storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
 
       cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
       assertTrue(cluster.isDataNodeUp());
@@ -256,7 +256,7 @@ public class TestDFSRollback {
           NodeType.DATA_NODE);
       
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.ROLLBACK, 
           cluster.getNamesystem().getBlockPoolId());
@@ -283,7 +283,7 @@ public class TestDFSRollback {
           NodeType.DATA_NODE);
      
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.ROLLBACK, 
           cluster.getNamesystem().getBlockPoolId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
index d202223..0c09eda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStartupVersions.java
@@ -265,7 +265,7 @@ public class TestDFSStartupVersions {
           conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY), "current");
       log("DataNode version info", DATA_NODE, i, versions[i]);
       UpgradeUtilities.createDataNodeVersionFile(storage,
-          versions[i].storageInfo, bpid, versions[i].blockPoolId);
+          versions[i].storageInfo, bpid, versions[i].blockPoolId, conf);
       try {
         cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
       } catch (Exception ignore) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index fe1ede0..0d9f502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -290,7 +290,7 @@ public class TestDFSUpgrade {
           UpgradeUtilities.getCurrentFsscTime(cluster), NodeType.DATA_NODE);
       
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       
       startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
           .getCurrentBlockPoolID(null));
@@ -308,7 +308,7 @@ public class TestDFSUpgrade {
           NodeType.DATA_NODE);
           
       UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo, 
-          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+          UpgradeUtilities.getCurrentBlockPoolID(cluster), conf);
       // Ensure corresponding block pool failed to initialized
       startBlockPoolShouldFail(StartupOption.REGULAR, UpgradeUtilities
           .getCurrentBlockPoolID(null));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
index b0504f0..174dea8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
@@ -386,8 +386,10 @@ public class UpgradeUtilities {
           new File(datanodeStorage.toString()));
       sd.setStorageUuid(DatanodeStorage.generateUuid());
       Properties properties = Storage.readPropertiesFile(sd.getVersionFile());
-      properties.setProperty("storageID", sd.getStorageUuid());
-      Storage.writeProperties(sd.getVersionFile(), properties);
+      if (properties != null) {
+        properties.setProperty("storageID", sd.getStorageUuid());
+        Storage.writeProperties(sd.getVersionFile(), properties);
+      }
 
       retVal[i] = newDir;
     }
@@ -463,8 +465,9 @@ public class UpgradeUtilities {
    * @param bpid Block pool Id
    */
   public static void createDataNodeVersionFile(File[] parent,
-      StorageInfo version, String bpid) throws IOException {
-    createDataNodeVersionFile(parent, version, bpid, bpid);
+      StorageInfo version, String bpid, Configuration conf)
+          throws IOException {
+    createDataNodeVersionFile(parent, version, bpid, bpid, conf);
   }
   
   /**
@@ -479,7 +482,8 @@ public class UpgradeUtilities {
    * @param bpidToWrite Block pool Id to write into the version file
    */
   public static void createDataNodeVersionFile(File[] parent,
-      StorageInfo version, String bpid, String bpidToWrite) throws IOException {
+      StorageInfo version, String bpid, String bpidToWrite, Configuration conf)
+          throws IOException {
     DataStorage storage = new DataStorage(version);
     storage.setDatanodeUuid("FixedDatanodeUuid");
 
@@ -487,7 +491,7 @@ public class UpgradeUtilities {
     for (int i = 0; i < parent.length; i++) {
       File versionFile = new File(parent[i], "VERSION");
       StorageDirectory sd = new StorageDirectory(parent[i].getParentFile());
-      DataStorage.createStorageID(sd, false);
+      DataStorage.createStorageID(sd, false, conf);
       storage.writeProperties(versionFile, sd);
       versionFiles[i] = versionFile;
       File bpDir = BlockPoolSliceStorage.getBpRoot(bpid, parent[i]);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
new file mode 100644
index 0000000..eaaac22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestTextBlockFormat.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.TextFileRegionFormat.*;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test for the text based block format for provided block maps.
+ */
+public class TestTextBlockFormat {
+
+  static final Path OUTFILE = new Path("hdfs://dummyServer:0000/dummyFile.txt");
+
+  void check(TextWriter.Options opts, final Path vp,
+      final Class<? extends CompressionCodec> vc) throws IOException {
+    TextFileRegionFormat mFmt = new TextFileRegionFormat() {
+      @Override
+      public TextWriter createWriter(Path file, CompressionCodec codec,
+          String delim, Configuration conf) throws IOException {
+        assertEquals(vp, file);
+        if (null == vc) {
+          assertNull(codec);
+        } else {
+          assertEquals(vc, codec.getClass());
+        }
+        return null; // ignored
+      }
+    };
+    mFmt.getWriter(opts);
+  }
+
+  @Test
+  public void testWriterOptions() throws Exception {
+    TextWriter.Options opts = TextWriter.defaults();
+    assertTrue(opts instanceof WriterOptions);
+    WriterOptions wopts = (WriterOptions) opts;
+    Path def = new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+    assertEquals(def, wopts.getFile());
+    assertNull(wopts.getCodec());
+
+    opts.filename(OUTFILE);
+    check(opts, OUTFILE, null);
+
+    opts.filename(OUTFILE);
+    opts.codec("gzip");
+    Path cp = new Path(OUTFILE.getParent(), OUTFILE.getName() + ".gz");
+    check(opts, cp, org.apache.hadoop.io.compress.GzipCodec.class);
+
+  }
+
+  @Test
+  public void testCSVReadWrite() throws Exception {
+    final DataOutputBuffer out = new DataOutputBuffer();
+    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), ",")) {
+      csv.store(r1);
+      csv.store(r2);
+      csv.store(r3);
+    }
+    Iterator<FileRegion> i3;
+    try (TextReader csv = new TextReader(null, null, null, ",") {
+      @Override
+      public InputStream createStream() {
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(out.getData(), 0, out.getLength());
+        return in;
+        }}) {
+      Iterator<FileRegion> i1 = csv.iterator();
+      assertEquals(r1, i1.next());
+      Iterator<FileRegion> i2 = csv.iterator();
+      assertEquals(r1, i2.next());
+      assertEquals(r2, i2.next());
+      assertEquals(r3, i2.next());
+      assertEquals(r2, i1.next());
+      assertEquals(r3, i1.next());
+
+      assertFalse(i1.hasNext());
+      assertFalse(i2.hasNext());
+      i3 = csv.iterator();
+    }
+    try {
+      i3.next();
+    } catch (IllegalStateException e) {
+      return;
+    }
+    fail("Invalid iterator");
+  }
+
+  @Test
+  public void testCSVReadWriteTsv() throws Exception {
+    final DataOutputBuffer out = new DataOutputBuffer();
+    FileRegion r1 = new FileRegion(4344L, OUTFILE, 0, 1024);
+    FileRegion r2 = new FileRegion(4345L, OUTFILE, 1024, 1024);
+    FileRegion r3 = new FileRegion(4346L, OUTFILE, 2048, 512);
+    try (TextWriter csv = new TextWriter(new OutputStreamWriter(out), "\t")) {
+      csv.store(r1);
+      csv.store(r2);
+      csv.store(r3);
+    }
+    Iterator<FileRegion> i3;
+    try (TextReader csv = new TextReader(null, null, null, "\t") {
+      @Override
+      public InputStream createStream() {
+        DataInputBuffer in = new DataInputBuffer();
+        in.reset(out.getData(), 0, out.getLength());
+        return in;
+      }}) {
+      Iterator<FileRegion> i1 = csv.iterator();
+      assertEquals(r1, i1.next());
+      Iterator<FileRegion> i2 = csv.iterator();
+      assertEquals(r1, i2.next());
+      assertEquals(r2, i2.next());
+      assertEquals(r3, i2.next());
+      assertEquals(r2, i1.next());
+      assertEquals(r3, i1.next());
+
+      assertFalse(i1.hasNext());
+      assertFalse(i2.hasNext());
+      i3 = csv.iterator();
+    }
+    try {
+      i3.next();
+    } catch (IllegalStateException e) {
+      return;
+    }
+    fail("Invalid iterator");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index cd3befd..540e8d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -616,7 +617,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     this.datanode = datanode;
     if (storage != null) {
       for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
-        DataStorage.createStorageID(storage.getStorageDir(i), false);
+        DataStorage.createStorageID(storage.getStorageDir(i), false, conf);
       }
       this.datanodeUuid = storage.getDatanodeUuid();
     } else {
@@ -1350,8 +1351,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException {
+  public void checkAndUpdate(String bpid, ScanInfo info) throws IOException {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 62ef731..216a07f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -94,8 +95,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+  public void checkAndUpdate(String bpid, ScanInfo info) {
+    return;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 905c3f0..571155f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -119,11 +119,12 @@ public class TestFsDatasetImpl {
   
   private final static String BLOCKPOOL = "BP-TEST";
 
-  private static Storage.StorageDirectory createStorageDirectory(File root)
+  private static Storage.StorageDirectory createStorageDirectory(File root,
+      Configuration conf)
       throws SecurityException, IOException {
     Storage.StorageDirectory sd = new Storage.StorageDirectory(
         StorageLocation.parse(root.toURI().toString()));
-    DataStorage.createStorageID(sd, false);
+    DataStorage.createStorageID(sd, false, conf);
     return sd;
   }
 
@@ -137,7 +138,7 @@ public class TestFsDatasetImpl {
       File loc = new File(BASE_DIR + "/data" + i);
       dirStrings.add(new Path(loc.toString()).toUri().toString());
       loc.mkdirs();
-      dirs.add(createStorageDirectory(loc));
+      dirs.add(createStorageDirectory(loc, conf));
       when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
     }
 
@@ -197,7 +198,8 @@ public class TestFsDatasetImpl {
       String pathUri = new Path(path).toUri().toString();
       expectedVolumes.add(new File(pathUri).getAbsolutePath());
       StorageLocation loc = StorageLocation.parse(pathUri);
-      Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+      Storage.StorageDirectory sd = createStorageDirectory(
+          new File(path), conf);
       DataStorage.VolumeBuilder builder =
           new DataStorage.VolumeBuilder(storage, sd);
       when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -315,7 +317,8 @@ public class TestFsDatasetImpl {
     String newVolumePath = BASE_DIR + "/newVolumeToRemoveLater";
     StorageLocation loc = StorageLocation.parse(newVolumePath);
 
-    Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath));
+    Storage.StorageDirectory sd = createStorageDirectory(
+        new File(newVolumePath), conf);
     DataStorage.VolumeBuilder builder =
         new DataStorage.VolumeBuilder(storage, sd);
     when(storage.prepareVolume(eq(datanode), eq(loc),
@@ -348,7 +351,7 @@ public class TestFsDatasetImpl {
         any(ReplicaMap.class),
         any(RamDiskReplicaLruTracker.class));
 
-    Storage.StorageDirectory sd = createStorageDirectory(badDir);
+    Storage.StorageDirectory sd = createStorageDirectory(badDir, conf);
     sd.lock();
     DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd);
     when(storage.prepareVolume(eq(datanode),
@@ -492,7 +495,7 @@ public class TestFsDatasetImpl {
     String path = BASE_DIR + "/newData0";
     String pathUri = new Path(path).toUri().toString();
     StorageLocation loc = StorageLocation.parse(pathUri);
-    Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+    Storage.StorageDirectory sd = createStorageDirectory(new File(path), conf);
     DataStorage.VolumeBuilder builder =
         new DataStorage.VolumeBuilder(storage, sd);
     when(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
new file mode 100644
index 0000000..2c119fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestProvidedImpl.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.common.FileRegionProvider;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic test cases for provided implementation.
+ */
+public class TestProvidedImpl {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFsDatasetImpl.class);
+  private static final String BASE_DIR =
+      new FileSystemTestHelper().getTestRootDir();
+  private static final int NUM_LOCAL_INIT_VOLUMES = 1;
+  private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
+  private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
+  private static final int NUM_PROVIDED_BLKS = 10;
+  private static final long BLK_LEN = 128 * 1024;
+  private static final int MIN_BLK_ID = 0;
+  private static final int CHOSEN_BP_ID = 0;
+
+  private static String providedBasePath = BASE_DIR;
+
+  private Configuration conf;
+  private DataNode datanode;
+  private DataStorage storage;
+  private FsDatasetImpl dataset;
+  private static Map<Long, String> blkToPathMap;
+  private static List<FsVolumeImpl> providedVolumes;
+
+  /**
+   * A simple FileRegion iterator for tests.
+   */
+  public static class TestFileRegionIterator implements Iterator<FileRegion> {
+
+    private int numBlocks;
+    private int currentCount;
+    private String basePath;
+
+    public TestFileRegionIterator(String basePath, int minID, int numBlocks) {
+      this.currentCount = minID;
+      this.numBlocks = numBlocks;
+      this.basePath = basePath;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return currentCount < numBlocks;
+    }
+
+    @Override
+    public FileRegion next() {
+      FileRegion region = null;
+      if (hasNext()) {
+        File newFile = new File(basePath, "file" + currentCount);
+        if(!newFile.exists()) {
+          try {
+            LOG.info("Creating file for blkid " + currentCount);
+            blkToPathMap.put((long) currentCount, newFile.getAbsolutePath());
+            LOG.info("Block id " + currentCount + " corresponds to file " +
+                newFile.getAbsolutePath());
+            newFile.createNewFile();
+            Writer writer = new OutputStreamWriter(
+                new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
+            for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) {
+              writer.write(currentCount);
+            }
+            writer.flush();
+            writer.close();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+        region = new FileRegion(currentCount, new Path(newFile.toString()),
+            0, BLK_LEN, BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+        currentCount++;
+      }
+      return region;
+    }
+
+    @Override
+    public void remove() {
+      //do nothing.
+    }
+
+    public void resetMinBlockId(int minId) {
+      currentCount = minId;
+    }
+
+    public void resetBlockCount(int numBlocks) {
+      this.numBlocks = numBlocks;
+    }
+
+  }
+
+  /**
+   * A simple FileRegion provider for tests.
+   */
+  public static class TestFileRegionProvider
+      extends FileRegionProvider implements Configurable {
+
+    private Configuration conf;
+    private int minId;
+    private int numBlocks;
+
+    TestFileRegionProvider() {
+      minId = MIN_BLK_ID;
+      numBlocks = NUM_PROVIDED_BLKS;
+    }
+
+    @Override
+    public Iterator<FileRegion> iterator() {
+      return new TestFileRegionIterator(providedBasePath, minId, numBlocks);
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void refresh() {
+      //do nothing!
+    }
+
+    public void setMinBlkId(int minId) {
+      this.minId = minId;
+    }
+
+    public void setBlockCount(int numBlocks) {
+      this.numBlocks = numBlocks;
+    }
+  }
+
+  private static Storage.StorageDirectory createLocalStorageDirectory(
+      File root, Configuration conf)
+      throws SecurityException, IOException {
+    Storage.StorageDirectory sd =
+        new Storage.StorageDirectory(
+            StorageLocation.parse(root.toURI().toString()));
+    DataStorage.createStorageID(sd, false, conf);
+    return sd;
+  }
+
+  private static Storage.StorageDirectory createProvidedStorageDirectory(
+      String confString, Configuration conf)
+      throws SecurityException, IOException {
+    Storage.StorageDirectory sd =
+        new Storage.StorageDirectory(StorageLocation.parse(confString));
+    DataStorage.createStorageID(sd, false, conf);
+    return sd;
+  }
+
+  private static void createStorageDirs(DataStorage storage,
+      Configuration conf, int numDirs, int numProvidedDirs)
+          throws IOException {
+    List<Storage.StorageDirectory> dirs =
+        new ArrayList<Storage.StorageDirectory>();
+    List<String> dirStrings = new ArrayList<String>();
+    FileUtils.deleteDirectory(new File(BASE_DIR));
+    for (int i = 0; i < numDirs; i++) {
+      File loc = new File(BASE_DIR, "data" + i);
+      dirStrings.add(new Path(loc.toString()).toUri().toString());
+      loc.mkdirs();
+      dirs.add(createLocalStorageDirectory(loc, conf));
+      when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+    }
+
+    for (int i = numDirs; i < numDirs + numProvidedDirs; i++) {
+      File loc = new File(BASE_DIR, "data" + i);
+      providedBasePath = loc.getAbsolutePath();
+      loc.mkdirs();
+      String dirString = "[PROVIDED]" +
+          new Path(loc.toString()).toUri().toString();
+      dirStrings.add(dirString);
+      dirs.add(createProvidedStorageDirectory(dirString, conf));
+      when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+    }
+
+    String dataDir = StringUtils.join(",", dirStrings);
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+    when(storage.dirIterator()).thenReturn(dirs.iterator());
+    when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs);
+  }
+
+  private int getNumVolumes() {
+    try (FsDatasetSpi.FsVolumeReferences volumes =
+        dataset.getFsVolumeReferences()) {
+      return volumes.size();
+    } catch (IOException e) {
+      return 0;
+    }
+  }
+
+  private void compareBlkFile(InputStream ins, String filepath)
+      throws FileNotFoundException, IOException {
+    try (ReadableByteChannel i = Channels.newChannel(
+        new FileInputStream(new File(filepath)))) {
+      try (ReadableByteChannel j = Channels.newChannel(ins)) {
+        ByteBuffer ib = ByteBuffer.allocate(4096);
+        ByteBuffer jb = ByteBuffer.allocate(4096);
+        while (true) {
+          int il = i.read(ib);
+          int jl = j.read(jb);
+          if (il < 0 || jl < 0) {
+            assertEquals(il, jl);
+            break;
+          }
+          ib.flip();
+          jb.flip();
+          int cmp = Math.min(ib.remaining(), jb.remaining());
+          for (int k = 0; k < cmp; ++k) {
+            assertEquals(ib.get(), jb.get());
+          }
+          ib.compact();
+          jb.compact();
+        }
+      }
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    datanode = mock(DataNode.class);
+    storage = mock(DataStorage.class);
+    this.conf = new Configuration();
+    this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
+
+    when(datanode.getConf()).thenReturn(conf);
+    final DNConf dnConf = new DNConf(datanode);
+    when(datanode.getDnConf()).thenReturn(dnConf);
+
+    final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+    when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
+    final ShortCircuitRegistry shortCircuitRegistry =
+        new ShortCircuitRegistry(conf);
+    when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
+
+    this.conf.setClass(DFSConfigKeys.DFS_PROVIDER_CLASS,
+        TestFileRegionProvider.class, FileRegionProvider.class);
+
+    blkToPathMap = new HashMap<Long, String>();
+    providedVolumes = new LinkedList<FsVolumeImpl>();
+
+    createStorageDirs(
+        storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES);
+
+    dataset = new FsDatasetImpl(datanode, storage, conf);
+    FsVolumeReferences volumes = dataset.getFsVolumeReferences();
+    for (int i = 0; i < volumes.size(); i++) {
+      FsVolumeSpi vol = volumes.get(i);
+      if (vol.getStorageType() == StorageType.PROVIDED) {
+        providedVolumes.add((FsVolumeImpl) vol);
+      }
+    }
+
+    for (String bpid : BLOCK_POOL_IDS) {
+      dataset.addBlockPool(bpid, conf);
+    }
+
+    assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES,
+        getNumVolumes());
+    assertEquals(0, dataset.getNumFailedVolumes());
+  }
+
+  @Test
+  public void testProvidedStorageID() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
+          providedVolumes.get(i).getStorageID());
+    }
+  }
+
+  @Test
+  public void testBlockLoad() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      FsVolumeImpl vol = providedVolumes.get(i);
+      ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+      vol.getVolumeMap(volumeMap, null);
+
+      assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
+      for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
+        if (j != CHOSEN_BP_ID) {
+          //this block pool should not have any blocks
+          assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
+        }
+      }
+      assertEquals(NUM_PROVIDED_BLKS,
+          volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
+    }
+  }
+
+  @Test
+  public void testProvidedBlockRead() throws IOException {
+    for (int id = 0; id < NUM_PROVIDED_BLKS; id++) {
+      ExtendedBlock eb = new ExtendedBlock(
+          BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN,
+          HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+      InputStream ins = dataset.getBlockInputStream(eb, 0);
+      String filepath = blkToPathMap.get((long) id);
+      compareBlkFile(ins, filepath);
+    }
+  }
+
+  @Test
+  public void testProvidedBlockIterator() throws IOException {
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      FsVolumeImpl vol = providedVolumes.get(i);
+      BlockIterator iter =
+          vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp");
+      Set<Long> blockIdsUsed = new HashSet<Long>();
+      while(!iter.atEnd()) {
+        ExtendedBlock eb = iter.nextBlock();
+        long blkId = eb.getBlockId();
+        assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
+        //all block ids must be unique!
+        assertTrue(!blockIdsUsed.contains(blkId));
+        blockIdsUsed.add(blkId);
+      }
+      assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size());
+    }
+  }
+
+
+  @Test
+  public void testRefresh() throws IOException {
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
+    for (int i = 0; i < providedVolumes.size(); i++) {
+      ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
+      TestFileRegionProvider provider = (TestFileRegionProvider)
+          vol.getFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
+      //equivalent to two new blocks appearing
+      provider.setBlockCount(NUM_PROVIDED_BLKS + 2);
+      //equivalent to deleting the first block
+      provider.setMinBlkId(MIN_BLK_ID + 1);
+
+      DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
+      scanner.reconcile();
+      ReplicaInfo info = dataset.getBlockReplica(
+          BLOCK_POOL_IDS[CHOSEN_BP_ID], NUM_PROVIDED_BLKS + 1);
+      //new replica should be added to the dataset
+      assertTrue(info != null);
+      try {
+        info = dataset.getBlockReplica(BLOCK_POOL_IDS[CHOSEN_BP_ID], 0);
+      } catch(Exception ex) {
+        LOG.info("Exception expected: " + ex);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/528798b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
index fa3399b..236627e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
@@ -64,7 +64,10 @@ public class TestClusterId {
       fsImage.getStorage().dirIterator(NNStorage.NameNodeDirType.IMAGE);
     StorageDirectory sd = sdit.next();
     Properties props = Storage.readPropertiesFile(sd.getVersionFile());
-    String cid = props.getProperty("clusterID");
+    String cid = null;
+    if (props != null) {
+      cid = props.getProperty("clusterID");
+    }
     LOG.info("successfully formated : sd="+sd.getCurrentDir() + ";cid="+cid);
     return cid;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message