hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asur...@apache.org
Subject [47/50] [abbrv] hadoop git commit: HDFS-10899. Add functionality to re-encrypt EDEKs.
Date Thu, 24 Aug 2017 19:36:40 GMT
HDFS-10899. Add functionality to re-encrypt EDEKs.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1000a2af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1000a2af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1000a2af

Branch: refs/heads/YARN-5972
Commit: 1000a2af04b24c123a3b08168f36b4e90420cab7
Parents: 26d8c8f
Author: Xiao Chen <xiao@apache.org>
Authored: Wed Aug 23 17:05:47 2017 -0700
Committer: Xiao Chen <xiao@apache.org>
Committed: Wed Aug 23 17:06:16 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   20 +
 .../hadoop/hdfs/DistributedFileSystem.java      |   34 +
 .../apache/hadoop/hdfs/client/HdfsAdmin.java    |   29 +
 .../hadoop/hdfs/protocol/ClientProtocol.java    |   25 +
 .../hadoop/hdfs/protocol/HdfsConstants.java     |    7 +
 .../hdfs/protocol/ReencryptionStatus.java       |  216 ++
 .../protocol/ReencryptionStatusIterator.java    |   58 +
 .../hdfs/protocol/ZoneReencryptionStatus.java   |  257 +++
 .../ClientNamenodeProtocolTranslatorPB.java     |   39 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  136 +-
 .../src/main/proto/ClientNamenodeProtocol.proto |    4 +
 .../src/main/proto/encryption.proto             |   41 +
 .../src/main/proto/hdfs.proto                   |   14 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   12 +
 ...tNamenodeProtocolServerSideTranslatorPB.java |   36 +
 .../namenode/EncryptionFaultInjector.java       |    9 +
 .../server/namenode/EncryptionZoneManager.java  |  351 +++-
 .../server/namenode/FSDirEncryptionZoneOp.java  |  238 ++-
 .../hdfs/server/namenode/FSDirWriteFileOp.java  |    4 +-
 .../hdfs/server/namenode/FSDirXAttrOp.java      |    7 +
 .../hdfs/server/namenode/FSDirectory.java       |   11 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   86 +
 .../hdfs/server/namenode/NameNodeRpcServer.java |   27 +
 .../server/namenode/ReencryptionHandler.java    |  940 +++++++++
 .../server/namenode/ReencryptionUpdater.java    |  523 +++++
 .../apache/hadoop/hdfs/tools/CryptoAdmin.java   |  134 +-
 .../src/main/resources/hdfs-default.xml         |   52 +
 .../src/site/markdown/TransparentEncryption.md  |   45 +-
 .../hdfs/server/namenode/TestReencryption.java  | 1847 ++++++++++++++++++
 .../namenode/TestReencryptionHandler.java       |  197 ++
 .../src/test/resources/testCryptoConf.xml       |   80 +
 31 files changed, 5443 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 47c14e2..9239df3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -131,11 +132,13 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatusIterator;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
@@ -2639,6 +2642,23 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return new EncryptionZoneIterator(namenode, tracer);
   }
 
+  public void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException {
+    checkOpen();
+    try (TraceScope ignored = newPathTraceScope("reencryptEncryptionZone",
+        zone)) {
+      namenode.reencryptEncryptionZone(zone, action);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+          SafeModeException.class, UnresolvedPathException.class);
+    }
+  }
+
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    checkOpen();
+    return new ReencryptionStatusIterator(namenode, tracer);
+  }
 
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index ceec2b3..f3605fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -83,11 +83,13 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -2314,6 +2316,38 @@ public class DistributedFileSystem extends FileSystem {
   }
 
   /* HDFS only */
+  public void reencryptEncryptionZone(final Path zone,
+      final ReencryptAction action) throws IOException {
+    final Path absF = fixRelativePart(zone);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException {
+        dfs.reencryptEncryptionZone(getPathName(p), action);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          myDfs.reencryptEncryptionZone(p, action);
+          return null;
+        }
+        throw new UnsupportedOperationException(
+            "Cannot call reencryptEncryptionZone"
+                + " on a symlink to a non-DistributedFileSystem: " + zone
+                + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /* HDFS only */
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    return dfs.listReencryptionStatus();
+  }
+
+  /* HDFS only */
   public FileEncryptionInfo getFileEncryptionInfo(final Path path)
       throws IOException {
     Path absF = fixRelativePart(path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index abf341e..85a7efe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.security.AccessControlException;
 
@@ -370,6 +372,33 @@ public class HdfsAdmin {
   }
 
   /**
+   * Performs re-encryption action for a given encryption zone.
+   *
+   * @param zone the root of the encryption zone
+   * @param action the re-encrypt action
+   * @throws IOException If any error occurs when handling re-encrypt action.
+   */
+  public void reencryptEncryptionZone(final Path zone,
+      final ReencryptAction action) throws IOException {
+    dfs.reencryptEncryptionZone(zone, action);
+  }
+
+  /**
+   * Returns a RemoteIterator which can be used to list all re-encryption
+   * information. For large numbers of re-encryptions, the iterator will fetch
+   * the list in a number of small batches.
+   * <p>
+   * Since the list is fetched in batches, it does not represent a
+   * consistent snapshot of the entire list of encryption zones.
+   * <p>
+   * This method can only be called by HDFS superusers.
+   */
+  public RemoteIterator<ZoneReencryptionStatus> listReencryptionStatus()
+      throws IOException {
+    return dfs.listReencryptionStatus();
+  }
+
+  /**
    * Returns the FileEncryptionInfo on the HdfsFileStatus for the given path.
    * The return value can be null if the path points to a directory, or a file
    * that is not in an encryption zone.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index b0e85e5..b550467 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1444,6 +1445,30 @@ public interface ClientProtocol {
       long prevId) throws IOException;
 
   /**
+   * Used to implement re-encryption of encryption zones.
+   *
+   * @param zone the encryption zone to re-encrypt.
+   * @param action the action for the re-encryption.
+   * @throws IOException
+   */
+  @AtMostOnce
+  void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException;
+
+  /**
+   * Used to implement cursor-based batched listing of
+   * {@ZoneReencryptionStatus}s.
+   *
+   * @param prevId ID of the last item in the previous batch. If there is no
+   *               previous batch, a negative value can be used.
+   * @return Batch of encryption zones.
+   * @throws IOException
+   */
+  @Idempotent
+  BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long prevId)
+      throws IOException;
+
+  /**
    * Set xattr of a file or directory.
    * The name must be prefixed with the namespace followed by ".". For example,
    * "user.attr".

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 2681f12..8c44293 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -144,6 +144,13 @@ public final class HdfsConstants {
     ALL, LIVE, DEAD, DECOMMISSIONING, ENTERING_MAINTENANCE, IN_MAINTENANCE
   }
 
+  /**
+   * Re-encrypt encryption zone actions.
+   */
+  public enum ReencryptAction {
+    CANCEL, START
+  }
+
   /* Hidden constructor */
   protected HdfsConstants() {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
new file mode 100644
index 0000000..e83ab52
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatus.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A class representing information about re-encrypting encryption zones. It
+ * contains a collection of @{code ZoneReencryptionStatus} for each EZ.
+ * <p>
+ * FSDirectory lock is used for synchronization (except test-only methods, which
+ * are not protected).
+ */
+@InterfaceAudience.Private
+public final class ReencryptionStatus {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReencryptionStatus.class);
+
+  public static final BatchedListEntries<ZoneReencryptionStatus> EMPTY_LIST =
+      new BatchedListEntries<>(Lists.newArrayList(), false);
+
+  /**
+   * The zones that were submitted for re-encryption. This should preserve
+   * the order of submission.
+   */
+  private final TreeMap<Long, ZoneReencryptionStatus> zoneStatuses;
+  // Metrics
+  private long zonesReencrypted;
+
+  public ReencryptionStatus() {
+    zoneStatuses = new TreeMap<>();
+  }
+
+  @VisibleForTesting
+  public ReencryptionStatus(ReencryptionStatus rhs) {
+    if (rhs != null) {
+      this.zoneStatuses = new TreeMap<>(rhs.zoneStatuses);
+      this.zonesReencrypted = rhs.zonesReencrypted;
+    } else {
+      zoneStatuses = new TreeMap<>();
+    }
+  }
+
+  @VisibleForTesting
+  public void resetMetrics() {
+    zonesReencrypted = 0;
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      entry.getValue().resetMetrics();
+    }
+  }
+
+  public ZoneReencryptionStatus getZoneStatus(final Long zondId) {
+    return zoneStatuses.get(zondId);
+  }
+
+  public void markZoneForRetry(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} will retry re-encryption", zoneId);
+    zs.setState(State.Submitted);
+  }
+
+  public void markZoneStarted(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} starts re-encryption processing", zoneId);
+    zs.setState(State.Processing);
+  }
+
+  public void markZoneCompleted(final Long zoneId) {
+    final ZoneReencryptionStatus zs = zoneStatuses.get(zoneId);
+    Preconditions.checkNotNull(zs, "Cannot find zone " + zoneId);
+    LOG.info("Zone {} completed re-encryption.", zoneId);
+    zs.setState(State.Completed);
+    zonesReencrypted++;
+  }
+
+  public Long getNextUnprocessedZone() {
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      if (entry.getValue().getState() == State.Submitted) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  public boolean hasRunningZone(final Long zoneId) {
+    return zoneStatuses.containsKey(zoneId)
+        && zoneStatuses.get(zoneId).getState() != State.Completed;
+  }
+
+  /**
+   * @param zoneId
+   * @return true if this is a zone is added.
+   */
+  private boolean addZoneIfNecessary(final Long zoneId, final String name,
+      final ReencryptionInfoProto reProto) {
+    if (!zoneStatuses.containsKey(zoneId)) {
+      LOG.debug("Adding zone {} for re-encryption status", zoneId);
+      Preconditions.checkNotNull(reProto);
+      final ZoneReencryptionStatus.Builder builder =
+          new ZoneReencryptionStatus.Builder();
+      builder.id(zoneId).zoneName(name)
+          .ezKeyVersionName(reProto.getEzKeyVersionName())
+          .submissionTime(reProto.getSubmissionTime())
+          .canceled(reProto.getCanceled())
+          .filesReencrypted(reProto.getNumReencrypted())
+          .fileReencryptionFailures(reProto.getNumFailures());
+      if (reProto.hasCompletionTime()) {
+        builder.completionTime(reProto.getCompletionTime());
+        builder.state(State.Completed);
+        zonesReencrypted++;
+      } else {
+        builder.state(State.Submitted);
+      }
+      if (reProto.hasLastFile()) {
+        builder.lastCheckpointFile(reProto.getLastFile());
+      }
+      return zoneStatuses.put(zoneId, builder.build()) == null;
+    }
+    return false;
+  }
+
+  public void updateZoneStatus(final Long zoneId, final String zonePath,
+      final ReencryptionInfoProto reProto) {
+    Preconditions.checkArgument(zoneId != null, "zoneId can't be null");
+    if (addZoneIfNecessary(zoneId, zonePath, reProto)) {
+      return;
+    }
+    final ZoneReencryptionStatus zs = getZoneStatus(zoneId);
+    assert zs != null;
+    if (reProto.hasCompletionTime()) {
+      zs.markZoneCompleted(reProto);
+    } else if (!reProto.hasLastFile() && !reProto.hasCompletionTime()) {
+      zs.markZoneSubmitted(reProto);
+    } else {
+      zs.updateZoneProcess(reProto);
+    }
+  }
+
+  public boolean removeZone(final Long zoneId) {
+    LOG.debug("Removing re-encryption status of zone {} ", zoneId);
+    return zoneStatuses.remove(zoneId) != null;
+  }
+
+  @VisibleForTesting
+  public int zonesQueued() {
+    int ret = 0;
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      if (entry.getValue().getState() == State.Submitted) {
+        ret++;
+      }
+    }
+    return ret;
+  }
+
+  @VisibleForTesting
+  public int zonesTotal() {
+    return zoneStatuses.size();
+  }
+
+  @VisibleForTesting
+  public long getNumZonesReencrypted() {
+    return zonesReencrypted;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<Long, ZoneReencryptionStatus> entry : zoneStatuses
+        .entrySet()) {
+      sb.append("[zone:" + entry.getKey());
+      sb.append(" state:" + entry.getValue().getState());
+      sb.append(" lastProcessed:" + entry.getValue().getLastCheckpointFile());
+      sb.append(" filesReencrypted:" + entry.getValue().getFilesReencrypted());
+      sb.append(" fileReencryptionFailures:" + entry.getValue()
+          .getNumReencryptionFailures() + "]");
+    }
+    return sb.toString();
+  }
+
+  public NavigableMap<Long, ZoneReencryptionStatus> getZoneStatuses() {
+    return zoneStatuses;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
new file mode 100644
index 0000000..c8a8857
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ReencryptionStatusIterator.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.core.TraceScope;
+import org.apache.htrace.core.Tracer;
+
+import java.io.IOException;
+
+/**
+ * ReencryptionStatusIterator is a remote iterator that iterates over the
+ * reencryption status of encryption zones.
+ * It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+public class ReencryptionStatusIterator
+    extends BatchedRemoteIterator<Long, ZoneReencryptionStatus> {
+
+  private final ClientProtocol namenode;
+  private final Tracer tracer;
+
+  public ReencryptionStatusIterator(ClientProtocol namenode, Tracer tracer) {
+    super((long) 0);
+    this.namenode = namenode;
+    this.tracer = tracer;
+  }
+
+  @Override
+  public BatchedEntries<ZoneReencryptionStatus> makeRequest(Long prevId)
+      throws IOException {
+    try (TraceScope ignored = tracer.newScope("listReencryptionStatus")) {
+      return namenode.listReencryptionStatus(prevId);
+    }
+  }
+
+  @Override
+  public Long elementToPrevKey(ZoneReencryptionStatus entry) {
+    return entry.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
new file mode 100644
index 0000000..9022b9f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ZoneReencryptionStatus.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
+
+/**
+ * A class representing information about re-encryption of an encryption zone.
+ * <p>
+ * FSDirectory lock is used for synchronization (except test-only methods, which
+ * are not protected).
+ */
+public class ZoneReencryptionStatus {
+  /**
+   * State of re-encryption.
+   */
+  public enum State {
+    /**
+     * Submitted for re-encryption but hasn't been picked up.
+     * This is the initial state.
+     */
+    Submitted,
+    /**
+     * Currently re-encrypting.
+     */
+    Processing,
+    /**
+     * Re-encryption completed.
+     */
+    Completed
+  }
+
+  private long id;
+  private String zoneName;
+  /**
+   * The re-encryption status of the zone. Note this is a in-memory only
+   * variable. On failover it will always be submitted, or completed if
+   * completionTime != 0;
+   */
+  private State state;
+  private String ezKeyVersionName;
+  private long submissionTime;
+  private long completionTime;
+  private boolean canceled;
+  /**
+   * Name of last file processed. It's important to record name (not inode)
+   * because we want to restore to the position even if the inode is removed.
+   */
+  private String lastCheckpointFile;
+  private long filesReencrypted;
+  private long numReencryptionFailures;
+
+  /**
+   * Builder of {@link ZoneReencryptionStatus}.
+   */
+  public static final class Builder {
+    private long id;
+    private String zoneName;
+    private State state;
+    private String ezKeyVersionName;
+    private long submissionTime;
+    private long completionTime;
+    private boolean canceled;
+    private String lastCheckpointFile;
+    private long filesReencrypted;
+    private long fileReencryptionFailures;
+
+    public Builder() {
+    }
+
+    public Builder id(final long inodeid) {
+      id = inodeid;
+      return this;
+    }
+
+    public Builder zoneName(final String ezName) {
+      zoneName = ezName;
+      return this;
+    }
+
+    public Builder state(final State st) {
+      state = st;
+      return this;
+    }
+
+    public Builder ezKeyVersionName(final String ezkvn) {
+      ezKeyVersionName = ezkvn;
+      return this;
+    }
+
+    public Builder submissionTime(final long submission) {
+      submissionTime = submission;
+      return this;
+    }
+
+    public Builder completionTime(final long completion) {
+      completionTime = completion;
+      return this;
+    }
+
+    public Builder canceled(final boolean isCanceled) {
+      canceled = isCanceled;
+      return this;
+    }
+
+    public Builder lastCheckpointFile(final String lastFile) {
+      lastCheckpointFile = lastFile;
+      return this;
+    }
+
+    public Builder filesReencrypted(final long numReencrypted) {
+      filesReencrypted = numReencrypted;
+      return this;
+    }
+
+    public Builder fileReencryptionFailures(final long numFailures) {
+      fileReencryptionFailures = numFailures;
+      return this;
+    }
+
+    public ZoneReencryptionStatus build() {
+      ZoneReencryptionStatus ret = new ZoneReencryptionStatus();
+      Preconditions.checkArgument(id != 0, "no inode id set.");
+      Preconditions.checkNotNull(state, "no state id set.");
+      Preconditions.checkNotNull(ezKeyVersionName, "no keyVersionName set.");
+      Preconditions
+          .checkArgument(submissionTime != 0, "no submission time set.");
+      ret.id = this.id;
+      ret.zoneName = this.zoneName;
+      ret.state = this.state;
+      ret.ezKeyVersionName = this.ezKeyVersionName;
+      ret.submissionTime = this.submissionTime;
+      ret.completionTime = this.completionTime;
+      ret.canceled = this.canceled;
+      ret.lastCheckpointFile = this.lastCheckpointFile;
+      ret.filesReencrypted = this.filesReencrypted;
+      ret.numReencryptionFailures = this.fileReencryptionFailures;
+      return ret;
+    }
+  }
+
+  public ZoneReencryptionStatus() {
+    reset();
+  }
+
+  void resetMetrics() {
+    filesReencrypted = 0;
+    numReencryptionFailures = 0;
+  }
+
+  public long getId() {
+    return id;
+  }
+
+  public String getZoneName() {
+    return zoneName;
+  }
+
+  void setState(final State s) {
+    state = s;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public String getEzKeyVersionName() {
+    return ezKeyVersionName;
+  }
+
+  public long getSubmissionTime() {
+    return submissionTime;
+  }
+
+  public long getCompletionTime() {
+    return completionTime;
+  }
+
+  public boolean isCanceled() {
+    return canceled;
+  }
+
+  public String getLastCheckpointFile() {
+    return lastCheckpointFile;
+  }
+
+  public long getFilesReencrypted() {
+    return filesReencrypted;
+  }
+
+  public long getNumReencryptionFailures() {
+    return numReencryptionFailures;
+  }
+
+  public void reset() {
+    state = State.Submitted;
+    ezKeyVersionName = null;
+    submissionTime = 0;
+    completionTime = 0;
+    canceled = false;
+    lastCheckpointFile = null;
+    resetMetrics();
+  }
+
+  /**
+   * Set the zone name. The zone name is resolved from inode id and set during
+   * a listReencryptionStatus call, for the crypto admin to consume.
+   */
+  public void setZoneName(final String name) {
+    Preconditions.checkNotNull(name == null);
+    zoneName = name;
+  }
+
+  public void cancel() {
+    canceled = true;
+  }
+
+  void markZoneCompleted(final ReencryptionInfoProto proto) {
+    state = ZoneReencryptionStatus.State.Completed;
+    completionTime = proto.getCompletionTime();
+    lastCheckpointFile = null;
+    canceled = proto.getCanceled();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+
+  void markZoneSubmitted(final ReencryptionInfoProto proto) {
+    reset();
+    state = ZoneReencryptionStatus.State.Submitted;
+    ezKeyVersionName = proto.getEzKeyVersionName();
+    submissionTime = proto.getSubmissionTime();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+
+  void updateZoneProcess(final ReencryptionInfoProto proto) {
+    lastCheckpointFile = proto.getLastFile();
+    filesReencrypted = proto.getNumReencrypted();
+    numReencryptionFailures = proto.getNumFailures();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index ac06c1a..ec7d93f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.BlocksStats;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
@@ -180,6 +182,10 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncrypt
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ZoneReencryptionStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@@ -1545,6 +1551,39 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
+  public void reencryptEncryptionZone(String zone, ReencryptAction action)
+      throws IOException {
+    final ReencryptEncryptionZoneRequestProto.Builder builder =
+        ReencryptEncryptionZoneRequestProto.newBuilder();
+    builder.setZone(zone).setAction(PBHelperClient.convert(action));
+    ReencryptEncryptionZoneRequestProto req = builder.build();
+    try {
+      rpcProxy.reencryptEncryptionZone(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long id)
+      throws IOException {
+    final ListReencryptionStatusRequestProto req =
+        ListReencryptionStatusRequestProto.newBuilder().setId(id).build();
+    try {
+      ListReencryptionStatusResponseProto response =
+          rpcProxy.listReencryptionStatus(null, req);
+      List<ZoneReencryptionStatus> elements =
+          Lists.newArrayListWithCapacity(response.getStatusesCount());
+      for (ZoneReencryptionStatusProto p : response.getStatusesList()) {
+        elements.add(PBHelperClient.convert(p));
+      }
+      return new BatchedListEntries<>(elements, response.getHasMore());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 5b1a687..30a3108 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryScopeProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTypeProto;
@@ -129,6 +131,9 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeMo
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptActionProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptionStateProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ZoneReencryptionStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AccessModeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.AddECPolicyResponseProto;
@@ -157,6 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.QuotaUsageProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RollingUpgradeStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportProto;
@@ -165,6 +171,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectorySt
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ZoneEncryptionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@@ -1678,6 +1685,17 @@ public class PBHelperClient {
     return builder.build();
   }
 
+  public static ReencryptActionProto convert(ReencryptAction a) {
+    switch (a) {
+    case CANCEL:
+      return ReencryptActionProto.CANCEL_REENCRYPT;
+    case START:
+      return ReencryptActionProto.START_REENCRYPT;
+    default:
+      throw new IllegalArgumentException("Unexpected value: " + a);
+    }
+  }
+
   public static RollingUpgradeActionProto convert(RollingUpgradeAction a) {
     switch (a) {
     case QUERY:
@@ -2282,6 +2300,17 @@ public class PBHelperClient {
     }
   }
 
+  public static ReencryptAction convert(ReencryptActionProto a) {
+    switch (a) {
+    case CANCEL_REENCRYPT:
+      return ReencryptAction.CANCEL;
+    case START_REENCRYPT:
+      return ReencryptAction.START;
+    default:
+      throw new IllegalArgumentException("Unexpected value: " + a);
+    }
+  }
+
   public static RollingUpgradeAction convert(RollingUpgradeActionProto a) {
     switch (a) {
     case QUERY:
@@ -2733,16 +2762,24 @@ public class PBHelperClient {
         .build();
   }
 
-  public static HdfsProtos.ZoneEncryptionInfoProto convert(
-      CipherSuite suite, CryptoProtocolVersion version, String keyName) {
+  public static ZoneEncryptionInfoProto convert(CipherSuite suite,
+      CryptoProtocolVersion version, String keyName) {
+    return convert(suite, version, keyName, null);
+  }
+
+  public static ZoneEncryptionInfoProto convert(CipherSuite suite,
+      CryptoProtocolVersion version, String keyName,
+      ReencryptionInfoProto proto) {
     if (suite == null || version == null || keyName == null) {
       return null;
     }
-    return HdfsProtos.ZoneEncryptionInfoProto.newBuilder()
-        .setSuite(convert(suite))
-        .setCryptoProtocolVersion(convert(version))
-        .setKeyName(keyName)
-        .build();
+    ZoneEncryptionInfoProto.Builder builder =
+        ZoneEncryptionInfoProto.newBuilder().setSuite(convert(suite))
+            .setCryptoProtocolVersion(convert(version)).setKeyName(keyName);
+    if (proto != null) {
+      builder.setReencryptionProto(proto);
+    }
+    return builder.build();
   }
 
   public static FileEncryptionInfo convert(
@@ -2759,6 +2796,91 @@ public class PBHelperClient {
         ezKeyVersionName);
   }
 
+  public static ReencryptionInfoProto convert(String ezkvn, Long submissionTime,
+      boolean isCanceled, long numReencrypted, long numFailures,
+      Long completionTime, String lastFile) {
+    if (ezkvn == null || submissionTime == null) {
+      return null;
+    }
+    ReencryptionInfoProto.Builder builder =
+        ReencryptionInfoProto.newBuilder().setEzKeyVersionName(ezkvn)
+            .setSubmissionTime(submissionTime).setCanceled(isCanceled)
+            .setNumReencrypted(numReencrypted).setNumFailures(numFailures);
+    if (completionTime != null) {
+      builder.setCompletionTime(completionTime);
+    }
+    if (lastFile != null) {
+      builder.setLastFile(lastFile);
+    }
+    return builder.build();
+  }
+
+  public static ZoneReencryptionStatusProto convert(ZoneReencryptionStatus zs) {
+    ZoneReencryptionStatusProto.Builder builder =
+        ZoneReencryptionStatusProto.newBuilder()
+        .setId(zs.getId())
+        .setPath(zs.getZoneName())
+        .setEzKeyVersionName(zs.getEzKeyVersionName())
+        .setSubmissionTime(zs.getSubmissionTime())
+        .setCanceled(zs.isCanceled())
+        .setNumReencrypted(zs.getFilesReencrypted())
+        .setNumFailures(zs.getNumReencryptionFailures());
+    switch (zs.getState()) {
+    case Submitted:
+      builder.setState(ReencryptionStateProto.SUBMITTED);
+      break;
+    case Processing:
+      builder.setState(ReencryptionStateProto.PROCESSING);
+      break;
+    case Completed:
+      builder.setState(ReencryptionStateProto.COMPLETED);
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown state " + zs.getState());
+    }
+    final long completion = zs.getCompletionTime();
+    if (completion != 0) {
+      builder.setCompletionTime(completion);
+    }
+    final String file = zs.getLastCheckpointFile();
+    if (file != null) {
+      builder.setLastFile(file);
+    }
+    return builder.build();
+  }
+
+  public static ZoneReencryptionStatus convert(
+      ZoneReencryptionStatusProto proto) {
+    ZoneReencryptionStatus.State state;
+    switch (proto.getState()) {
+    case SUBMITTED:
+      state = ZoneReencryptionStatus.State.Submitted;
+      break;
+    case PROCESSING:
+      state = ZoneReencryptionStatus.State.Processing;
+      break;
+    case COMPLETED:
+      state = ZoneReencryptionStatus.State.Completed;
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown state " + proto.getState());
+    }
+    ZoneReencryptionStatus.Builder builder = new ZoneReencryptionStatus.
+        Builder().
+        id(proto.getId()).zoneName(proto.getPath()).state(state)
+        .ezKeyVersionName(proto.getEzKeyVersionName())
+        .submissionTime(proto.getSubmissionTime()).canceled(proto.getCanceled())
+        .filesReencrypted(proto.getNumReencrypted())
+        .fileReencryptionFailures(proto.getNumFailures());
+    if (proto.hasCompletionTime()) {
+      builder.completionTime(proto.getCompletionTime());
+    }
+    if (proto.hasLastFile()) {
+      builder.lastCheckpointFile(proto.getLastFile());
+    }
+    return builder.build();
+  }
+
   public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
     List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
     DatanodeInfo[] infos = new DatanodeInfo[proto.size()];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index 4f44c5e..3f108fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -941,6 +941,10 @@ service ClientNamenodeProtocol {
       returns(CreateEncryptionZoneResponseProto);
   rpc listEncryptionZones(ListEncryptionZonesRequestProto)
       returns(ListEncryptionZonesResponseProto);
+  rpc reencryptEncryptionZone(ReencryptEncryptionZoneRequestProto)
+      returns(ReencryptEncryptionZoneResponseProto);
+  rpc listReencryptionStatus(ListReencryptionStatusRequestProto)
+      returns(ListReencryptionStatusResponseProto);
   rpc getEZForPath(GetEZForPathRequestProto)
       returns(GetEZForPathResponseProto);
   rpc setErasureCodingPolicy(SetErasureCodingPolicyRequestProto)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
index 68b2f3a..75d3a0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/encryption.proto
@@ -58,6 +58,47 @@ message ListEncryptionZonesResponseProto {
   required bool hasMore = 2;
 }
 
+enum ReencryptActionProto {
+  CANCEL_REENCRYPT = 1;
+  START_REENCRYPT = 2;
+}
+
+message ReencryptEncryptionZoneRequestProto {
+  required ReencryptActionProto action = 1;
+  required string zone = 2;
+}
+
+message ReencryptEncryptionZoneResponseProto {
+}
+
+message ListReencryptionStatusRequestProto {
+  required int64 id = 1;
+}
+
+enum ReencryptionStateProto {
+  SUBMITTED = 1;
+  PROCESSING = 2;
+  COMPLETED = 3;
+}
+
+message ZoneReencryptionStatusProto {
+  required int64 id = 1;
+  required string path = 2;
+  required ReencryptionStateProto state = 3;
+  required string ezKeyVersionName = 4;
+  required int64 submissionTime = 5;
+  required bool canceled = 6;
+  required int64 numReencrypted = 7;
+  required int64 numFailures = 8;
+  optional int64 completionTime = 9;
+  optional string lastFile = 10;
+}
+
+message ListReencryptionStatusResponseProto {
+  repeated ZoneReencryptionStatusProto statuses = 1;
+  required bool hasMore = 2;
+}
+
 message GetEZForPathRequestProto {
     required string src = 1;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 7109980..59381bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -313,6 +313,20 @@ message ZoneEncryptionInfoProto {
   required CipherSuiteProto suite = 1;
   required CryptoProtocolVersionProto cryptoProtocolVersion = 2;
   required string keyName = 3;
+  optional ReencryptionInfoProto reencryptionProto = 4;
+}
+
+/**
+ * Re-encryption information for an encryption zone
+ */
+message ReencryptionInfoProto {
+  required string ezKeyVersionName = 1;
+  required uint64 submissionTime = 2;
+  required bool canceled = 3;
+  required int64 numReencrypted = 4;
+  required int64 numFailures = 5;
+  optional uint64 completionTime = 6;
+  optional string lastFile = 7;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f4c383e..7f60000 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -881,6 +881,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
+  public static final int    DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT = 100;
+  public static final String DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY = "dfs.namenode.list.reencryption.status.num.responses";
   public static final String DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES =
       "dfs.namenode.list.openfiles.num.responses";
   public static final int    DFS_NAMENODE_LIST_OPENFILES_NUM_RESPONSES_DEFAULT =
@@ -889,6 +891,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;
   public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms";
   public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;
+  public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY = "dfs.namenode.reencrypt.sleep.interval";
+  public static final String DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_DEFAULT = "1m";
+  public static final String DFS_NAMENODE_REENCRYPT_BATCH_SIZE_KEY = "dfs.namenode.reencrypt.batch.size";
+  public static final int DFS_NAMENODE_REENCRYPT_BATCH_SIZE_DEFAULT = 1000;
+  public static final String DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_KEY = "dfs.namenode.reencrypt.throttle.limit.handler.ratio";
+  public static final double DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_HANDLER_RATIO_DEFAULT = 1.0;
+  public static final String DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_KEY = "dfs.namenode.reencrypt.throttle.limit.updater.ratio";
+  public static final double DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_UPDATER_RATIO_DEFAULT = 1.0;
+  public static final String DFS_NAMENODE_REENCRYPT_EDEK_THREADS_KEY = "dfs.namenode.reencrypt.edek.threads";
+  public static final int DFS_NAMENODE_REENCRYPT_EDEK_THREADS_DEFAULT = 10;
 
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index a446276..44d5216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.ModifyAclEntriesRequestProto;
@@ -221,8 +222,12 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListReencryptionStatusResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.AddErasureCodingPoliciesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ReencryptEncryptionZoneResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPolicyRequestProto;
@@ -1483,6 +1488,37 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   }
 
   @Override
+  public ReencryptEncryptionZoneResponseProto reencryptEncryptionZone(
+      RpcController controller, ReencryptEncryptionZoneRequestProto req)
+      throws ServiceException {
+    try {
+      server.reencryptEncryptionZone(req.getZone(),
+          PBHelperClient.convert(req.getAction()));
+      return ReencryptEncryptionZoneResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  public ListReencryptionStatusResponseProto listReencryptionStatus(
+      RpcController controller, ListReencryptionStatusRequestProto req)
+      throws ServiceException {
+    try {
+      BatchedEntries<ZoneReencryptionStatus> entries = server
+          .listReencryptionStatus(req.getId());
+      ListReencryptionStatusResponseProto.Builder builder =
+          ListReencryptionStatusResponseProto.newBuilder();
+      builder.setHasMore(entries.hasMore());
+      for (int i=0; i<entries.size(); i++) {
+        builder.addStatuses(PBHelperClient.convert(entries.get(i)));
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public SetErasureCodingPolicyResponseProto setErasureCodingPolicy(
       RpcController controller, SetErasureCodingPolicyRequestProto req)
       throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
index 104d8c3..e4a035e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionFaultInjector.java
@@ -42,4 +42,13 @@ public class EncryptionFaultInjector {
 
   @VisibleForTesting
   public void startFileAfterGenerateKey() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptEncryptedKeys() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptUpdaterProcessOneTask() throws IOException {}
+
+  @VisibleForTesting
+  public void reencryptUpdaterProcessCheckpoint() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1000a2af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
index 96e189b..d6302ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java
@@ -19,32 +19,45 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.security.AccessControlException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
     .CRYPTO_XATTR_ENCRYPTION_ZONE;
 
@@ -57,7 +70,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
  */
 public class EncryptionZoneManager {
 
-  public static Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
+  public static final Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
       .class);
 
   /**
@@ -99,6 +112,91 @@ public class EncryptionZoneManager {
   private TreeMap<Long, EncryptionZoneInt> encryptionZones = null;
   private final FSDirectory dir;
   private final int maxListEncryptionZonesResponses;
+  private final int maxListRecncryptionStatusResponses;
+
+  private ThreadFactory reencryptionThreadFactory;
+  private ExecutorService reencryptHandlerExecutor;
+  private ReencryptionHandler reencryptionHandler;
+  // Reencryption status is kept here to decouple status listing (which should
+  // work as long as NN is up), with the actual handler (which only exists if
+  // keyprovider exists)
+  private final ReencryptionStatus reencryptionStatus;
+
+  public static final BatchedListEntries<ZoneReencryptionStatus> EMPTY_LIST =
+      new BatchedListEntries<>(new ArrayList<ZoneReencryptionStatus>(), false);
+
+  @VisibleForTesting
+  public void pauseReencryptForTesting() {
+    reencryptionHandler.pauseForTesting();
+  }
+
+  @VisibleForTesting
+  public void resumeReencryptForTesting() {
+    reencryptionHandler.resumeForTesting();
+  }
+
+  @VisibleForTesting
+  public void pauseForTestingAfterNthSubmission(final int count) {
+    reencryptionHandler.pauseForTestingAfterNthSubmission(count);
+  }
+
+  @VisibleForTesting
+  public void pauseReencryptUpdaterForTesting() {
+    reencryptionHandler.pauseUpdaterForTesting();
+  }
+
+  @VisibleForTesting
+  public void resumeReencryptUpdaterForTesting() {
+    reencryptionHandler.resumeUpdaterForTesting();
+  }
+
+  @VisibleForTesting
+  public void pauseForTestingAfterNthCheckpoint(final String zone,
+      final int count) throws IOException {
+    INodesInPath iip;
+    dir.readLock();
+    try {
+      iip = dir.resolvePath(dir.getPermissionChecker(), zone, DirOp.READ);
+    } finally {
+      dir.readUnlock();
+    }
+    reencryptionHandler
+        .pauseForTestingAfterNthCheckpoint(iip.getLastINode().getId(), count);
+  }
+
+  @VisibleForTesting
+  public void resetMetricsForTesting() {
+    reencryptionStatus.resetMetrics();
+  }
+
+  @VisibleForTesting
+  public ReencryptionStatus getReencryptionStatus() {
+    return reencryptionStatus;
+  }
+
+  @VisibleForTesting
+  public ZoneReencryptionStatus getZoneStatus(final String zone)
+      throws IOException {
+    final FSPermissionChecker pc = dir.getPermissionChecker();
+    final INode inode;
+    dir.getFSNamesystem().readLock();
+    dir.readLock();
+    try {
+      final INodesInPath iip = dir.resolvePath(pc, zone, DirOp.READ);
+      inode = iip.getLastINode();
+      if (inode == null) {
+        return null;
+      }
+      return getReencryptionStatus().getZoneStatus(inode.getId());
+    } finally {
+      dir.readUnlock();
+      dir.getFSNamesystem().readUnlock();
+    }
+  }
+
+  FSDirectory getFSDirectory() {
+    return dir;
+  }
 
   /**
    * Construct a new EncryptionZoneManager.
@@ -115,6 +213,50 @@ public class EncryptionZoneManager {
         DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES + " " +
             "must be a positive integer."
     );
+    if (getProvider() != null) {
+      reencryptionHandler = new ReencryptionHandler(this, conf);
+      reencryptionThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("reencryptionHandlerThread #%d").build();
+    }
+    maxListRecncryptionStatusResponses =
+        conf.getInt(DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY,
+            DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_DEFAULT);
+    Preconditions.checkArgument(maxListRecncryptionStatusResponses >= 0,
+        DFS_NAMENODE_LIST_REENCRYPTION_STATUS_NUM_RESPONSES_KEY +
+            " must be a positive integer."
+    );
+    reencryptionStatus = new ReencryptionStatus();
+  }
+
+  KeyProviderCryptoExtension getProvider() {
+    return dir.getProvider();
+  }
+
+  void startReencryptThreads() {
+    if (getProvider() == null) {
+      return;
+    }
+    Preconditions.checkNotNull(reencryptionHandler);
+    reencryptHandlerExecutor =
+        Executors.newSingleThreadExecutor(reencryptionThreadFactory);
+    reencryptHandlerExecutor.execute(reencryptionHandler);
+    reencryptionHandler.startUpdaterThread();
+  }
+
+  void stopReencryptThread() {
+    if (getProvider() == null || reencryptionHandler == null) {
+      return;
+    }
+    dir.writeLock();
+    try {
+      reencryptionHandler.stopThreads();
+    } finally {
+      dir.writeUnlock();
+    }
+    if (reencryptHandlerExecutor != null) {
+      reencryptHandlerExecutor.shutdownNow();
+      reencryptHandlerExecutor = null;
+    }
   }
 
   /**
@@ -157,7 +299,13 @@ public class EncryptionZoneManager {
   void removeEncryptionZone(Long inodeId) {
     assert dir.hasWriteLock();
     if (hasCreatedEncryptionZone()) {
-      encryptionZones.remove(inodeId);
+      if (encryptionZones.remove(inodeId) == null
+          || !getReencryptionStatus().hasRunningZone(inodeId)) {
+        return;
+      }
+      if (reencryptionHandler != null) {
+        reencryptionHandler.removeZone(inodeId);
+      }
     }
   }
 
@@ -173,13 +321,17 @@ public class EncryptionZoneManager {
   }
 
   /**
-   * Returns the path of the EncryptionZoneInt.
+   * Returns the full path from an INode id.
    * <p/>
    * Called while holding the FSDirectory lock.
    */
-  private String getFullPathName(EncryptionZoneInt ezi) {
+  String getFullPathName(Long nodeId) {
     assert dir.hasReadLock();
-    return dir.getInode(ezi.getINodeId()).getFullPathName();
+    INode inode = dir.getInode(nodeId);
+    if (inode == null) {
+      return null;
+    }
+    return inode.getFullPathName();
   }
 
   /**
@@ -247,7 +399,8 @@ public class EncryptionZoneManager {
     if (ezi == null) {
       return null;
     } else {
-      return new EncryptionZone(ezi.getINodeId(), getFullPathName(ezi),
+      return new EncryptionZone(ezi.getINodeId(),
+          getFullPathName(ezi.getINodeId()),
           ezi.getSuite(), ezi.getVersion(), ezi.getKeyName());
     }
   }
@@ -284,8 +437,8 @@ public class EncryptionZoneManager {
 
     if (srcInEZ) {
       if (srcParentEZI != dstParentEZI) {
-        final String srcEZPath = getFullPathName(srcParentEZI);
-        final String dstEZPath = getFullPathName(dstParentEZI);
+        final String srcEZPath = getFullPathName(srcParentEZI.getINodeId());
+        final String dstEZPath = getFullPathName(dstParentEZI.getINodeId());
         final StringBuilder sb = new StringBuilder(srcIIP.getPath());
         sb.append(" can't be moved from encryption zone ");
         sb.append(srcEZPath);
@@ -294,6 +447,24 @@ public class EncryptionZoneManager {
         sb.append(".");
         throw new IOException(sb.toString());
       }
+      checkMoveValidityForReencryption(srcIIP.getPath(),
+          srcParentEZI.getINodeId());
+    } else if (dstInEZ) {
+      checkMoveValidityForReencryption(dstIIP.getPath(),
+          dstParentEZI.getINodeId());
+    }
+  }
+
+  private void checkMoveValidityForReencryption(final String pathName,
+      final long zoneId) throws IOException {
+    assert dir.hasReadLock();
+    final ZoneReencryptionStatus zs = reencryptionStatus.getZoneStatus(zoneId);
+    if (zs != null && zs.getState() != ZoneReencryptionStatus.State.Completed) {
+      final StringBuilder sb = new StringBuilder(pathName);
+      sb.append(" can't be moved because encryption zone ");
+      sb.append(getFullPathName(zoneId));
+      sb.append(" is currently under re-encryption");
+      throw new IOException(sb.toString());
     }
   }
 
@@ -364,19 +535,13 @@ public class EncryptionZoneManager {
       /*
        Skip EZs that are only present in snapshots. Re-resolve the path to 
        see if the path's current inode ID matches EZ map's INode ID.
-       
+
        INode#getFullPathName simply calls getParent recursively, so will return
-       the INode's parents at the time it was snapshotted. It will not 
+       the INode's parents at the time it was snapshotted. It will not
        contain a reference INode.
       */
-      final String pathName = getFullPathName(ezi);
-      INode inode = dir.getInode(ezi.getINodeId());
-      INode lastINode = null;
-      if (inode.getParent() != null || inode.isRoot()) {
-        INodesInPath iip = dir.getINodesInPath(pathName, DirOp.READ_LINK);
-        lastINode = iip.getLastINode();
-      }
-      if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
+      final String pathName = getFullPathName(ezi.getINodeId());
+      if (!pathResolvesToId(ezi.getINodeId(), pathName)) {
         continue;
       }
       // Add the EZ to the result list
@@ -392,6 +557,156 @@ public class EncryptionZoneManager {
   }
 
   /**
+   * Resolves the path to inode id, then check if it's the same as the inode id
+   * passed in. This is necessary to filter out zones in snapshots.
+   * @param zoneId
+   * @param zonePath
+   * @return true if path resolve to the id, false if not.
+   * @throws UnresolvedLinkException
+   */
+  private boolean pathResolvesToId(final long zoneId, final String zonePath)
+      throws UnresolvedLinkException, AccessControlException,
+      ParentNotDirectoryException {
+    assert dir.hasReadLock();
+    INode inode = dir.getInode(zoneId);
+    if (inode == null) {
+      return false;
+    }
+    INode lastINode = null;
+    if (inode.getParent() != null || inode.isRoot()) {
+      INodesInPath iip = dir.getINodesInPath(zonePath, DirOp.READ_LINK);
+      lastINode = iip.getLastINode();
+    }
+    if (lastINode == null || lastINode.getId() != zoneId) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Re-encrypts the given encryption zone path. If the given path is not the
+   * root of an encryption zone, an exception is thrown.
+   */
+  XAttr reencryptEncryptionZone(final INodesInPath zoneIIP,
+      final String keyVersionName) throws IOException {
+    assert dir.hasWriteLock();
+    if (reencryptionHandler == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    final INode inode = zoneIIP.getLastINode();
+    final String zoneName = zoneIIP.getPath();
+    checkEncryptionZoneRoot(inode, zoneName);
+    if (getReencryptionStatus().hasRunningZone(inode.getId())) {
+      throw new IOException("Zone " + zoneName
+          + " is already submitted for re-encryption.");
+    }
+    LOG.info("Zone {}({}) is submitted for re-encryption.", zoneName,
+        inode.getId());
+    XAttr ret = FSDirEncryptionZoneOp
+        .updateReencryptionSubmitted(dir, zoneIIP, keyVersionName);
+    reencryptionHandler.notifyNewSubmission();
+    return ret;
+  }
+
+  /**
+   * Cancels the currently-running re-encryption of the given encryption zone.
+   * If the given path is not the root of an encryption zone,
+   * * an exception is thrown.
+   */
+  List<XAttr> cancelReencryptEncryptionZone(final INodesInPath zoneIIP)
+      throws IOException {
+    assert dir.hasWriteLock();
+    if (reencryptionHandler == null) {
+      throw new IOException("No key provider configured, re-encryption "
+          + "operation is rejected");
+    }
+    final long zoneId = zoneIIP.getLastINode().getId();
+    final String zoneName = zoneIIP.getPath();
+    checkEncryptionZoneRoot(zoneIIP.getLastINode(), zoneName);
+    reencryptionHandler.cancelZone(zoneId, zoneName);
+    LOG.info("Cancelled zone {}({}) for re-encryption.", zoneName, zoneId);
+    return FSDirEncryptionZoneOp.updateReencryptionFinish(dir, zoneIIP,
+        reencryptionStatus.getZoneStatus(zoneId));
+  }
+
+  /**
+   * Cursor-based listing of zone re-encryption status.
+   * <p/>
+   * Called while holding the FSDirectory lock.
+   */
+  BatchedListEntries<ZoneReencryptionStatus> listReencryptionStatus(
+      final long prevId) throws IOException {
+    assert dir.hasReadLock();
+    if (!hasCreatedEncryptionZone()) {
+      return ReencryptionStatus.EMPTY_LIST;
+    }
+
+    NavigableMap<Long, ZoneReencryptionStatus> stats =
+        reencryptionStatus.getZoneStatuses();
+
+    if (stats.isEmpty()) {
+      return EMPTY_LIST;
+    }
+
+    NavigableMap<Long, ZoneReencryptionStatus> tailMap =
+        stats.tailMap(prevId, false);
+    final int numResp =
+        Math.min(maxListRecncryptionStatusResponses, tailMap.size());
+    final List<ZoneReencryptionStatus> ret =
+        Lists.newArrayListWithExpectedSize(numResp);
+    int count = 0;
+    for (ZoneReencryptionStatus zs : tailMap.values()) {
+      final String name = getFullPathName(zs.getId());
+      if (name == null || !pathResolvesToId(zs.getId(), name)) {
+        continue;
+      }
+      zs.setZoneName(name);
+      ret.add(zs);
+      ++count;
+      if (count >= numResp) {
+        break;
+      }
+    }
+    final boolean hasMore = (numResp < tailMap.size());
+    return new BatchedListEntries<>(ret, hasMore);
+  }
+
+  /**
+   * Return whether an INode is an encryption zone root.
+   */
+  boolean isEncryptionZoneRoot(final INode inode, final String name)
+      throws FileNotFoundException {
+    assert dir.hasReadLock();
+    if (inode == null) {
+      throw new FileNotFoundException("INode does not exist for " + name);
+    }
+    if (!inode.isDirectory()) {
+      return false;
+    }
+    if (!hasCreatedEncryptionZone()
+        || !encryptionZones.containsKey(inode.getId())) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Return whether an INode is an encryption zone root.
+   *
+   * @param inode the zone inode
+   * @throws IOException if the inode is not a directory,
+   *                     or is a directory but not the root of an EZ.
+   */
+  void checkEncryptionZoneRoot(final INode inode, final String name)
+      throws IOException {
+    if (!isEncryptionZoneRoot(inode, name)) {
+      throw new IOException("Path " + name + " is not the root of an"
+          + " encryption zone.");
+    }
+  }
+
+  /**
    * @return number of encryption zones.
    */
   public int getNumEncryptionZones() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message