hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [07/58] [abbrv] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
Date Wed, 30 Sep 2015 15:41:07 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
new file mode 100644
index 0000000..745ca7e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The Hdfs implementation of {@link FSDataOutputStream}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HdfsDataOutputStream extends FSDataOutputStream {
+  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats,
+      long startPosition) throws IOException {
+    super(out, stats, startPosition);
+  }
+
+  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats
+      ) throws IOException {
+    this(out, stats, 0L);
+  }
+
+  public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats,
+      long startPosition) throws IOException {
+    super(out, stats, startPosition);
+    Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream,
+        "CryptoOutputStream should wrap a DFSOutputStream");
+  }
+
+  public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats)
+      throws IOException {
+    this(out, stats, 0L);
+  }
+
+  /**
+   * Get the actual number of replicas of the current block.
+   * 
+   * This can be different from the designated replication factor of the file
+   * because the namenode does not maintain replication for the blocks which are
+   * currently being written to. Depending on the configuration, the client may
+   * continue to write to a block even if a few datanodes in the write pipeline
+   * have failed, or the client may add a new datanodes once a datanode has
+   * failed.
+   * 
+   * @return the number of valid replicas of the current block
+   */
+  public synchronized int getCurrentBlockReplication() throws IOException {
+    OutputStream wrappedStream = getWrappedStream();
+    if (wrappedStream instanceof CryptoOutputStream) {
+      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
+    }
+    return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication();
+  }
+  
+  /**
+   * Sync buffered data to DataNodes (flush to disk devices).
+   * 
+   * @param syncFlags
+   *          Indicate the detailed semantic and actions of the hsync.
+   * @throws IOException
+   * @see FSDataOutputStream#hsync()
+   */
+  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
+    OutputStream wrappedStream = getWrappedStream();
+    if (wrappedStream instanceof CryptoOutputStream) {
+      ((CryptoOutputStream) wrappedStream).flush();
+      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
+    }
+    ((DFSOutputStream) wrappedStream).hsync(syncFlags);
+  }
+  
+  public static enum SyncFlag {
+
+    /**
+     * When doing sync to DataNodes, also update the metadata (block length) in
+     * the NameNode.
+     */
+    UPDATE_LENGTH,
+
+    /**
+     * Sync the data to DataNode, close the current block, and allocate a new
+     * block
+     */
+    END_BLOCK;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
new file mode 100644
index 0000000..c3d2cfc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ * </p>
+ * <p>
+ * This class also provides the following functionality:
+ * <ul>
+ * <li>
+ * It maintains a map from (namenode, user) pairs to lease renewers.
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
+ * </li>
+ * <li>
+ * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ * </li>
+ * <li>
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ * </li>
+ * </ul>
+ * </p>
+ */
+@InterfaceAudience.Private
+public class LeaseRenewer {
+  static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
+
+  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+  static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+
+  /** Get a {@link LeaseRenewer} instance */
+  public static LeaseRenewer getInstance(final String authority,
+      final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+    final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
+    r.addClient(dfsc);
+    return r;
+  }
+
+  /**
+   * A factory for sharing {@link LeaseRenewer} objects
+   * among {@link DFSClient} instances
+   * so that there is only one renewer per authority per user.
+   */
+  private static class Factory {
+    private static final Factory INSTANCE = new Factory();
+
+    private static class Key {
+      /** Namenode info */
+      final String authority;
+      /** User info */
+      final UserGroupInformation ugi;
+
+      private Key(final String authority, final UserGroupInformation ugi) {
+        if (authority == null) {
+          throw new HadoopIllegalArgumentException("authority == null");
+        } else if (ugi == null) {
+          throw new HadoopIllegalArgumentException("ugi == null");
+        }
+
+        this.authority = authority;
+        this.ugi = ugi;
+      }
+
+      @Override
+      public int hashCode() {
+        return authority.hashCode() ^ ugi.hashCode();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (obj == this) {
+          return true;
+        }
+        if (obj != null && obj instanceof Key) {
+          final Key that = (Key)obj;
+          return this.authority.equals(that.authority)
+                 && this.ugi.equals(that.ugi);
+        }
+        return false;
+      }
+
+      @Override
+      public String toString() {
+        return ugi.getShortUserName() + "@" + authority;
+      }
+    }
+
+    /** A map for per user per namenode renewers. */
+    private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>();
+
+    /** Get a renewer. */
+    private synchronized LeaseRenewer get(final String authority,
+        final UserGroupInformation ugi) {
+      final Key k = new Key(authority, ugi);
+      LeaseRenewer r = renewers.get(k);
+      if (r == null) {
+        r = new LeaseRenewer(k);
+        renewers.put(k, r);
+      }
+      return r;
+    }
+
+    /** Remove the given renewer. */
+    private synchronized void remove(final LeaseRenewer r) {
+      final LeaseRenewer stored = renewers.get(r.factorykey);
+      //Since a renewer may expire, the stored renewer can be different.
+      if (r == stored) {
+        if (!r.clientsRunning()) {
+          renewers.remove(r.factorykey);
+        }
+      }
+    }
+  }
+
+  /** The time in milliseconds that the map became empty. */
+  private long emptyTime = Long.MAX_VALUE;
+  /** A fixed lease renewal time period in milliseconds */
+  private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2;
+
+  /** A daemon for renewing lease */
+  private Daemon daemon = null;
+  /** Only the daemon with currentId should run. */
+  private int currentId = 0;
+
+  /**
+   * A period in milliseconds that the lease renewer thread should run
+   * after the map became empty.
+   * In other words,
+   * if the map is empty for a time period longer than the grace period,
+   * the renewer should terminate.
+   */
+  private long gracePeriod;
+  /**
+   * The time period in milliseconds
+   * that the renewer sleeps for each iteration.
+   */
+  private long sleepPeriod;
+
+  private final Factory.Key factorykey;
+
+  /** A list of clients corresponding to this renewer. */
+  private final List<DFSClient> dfsclients = new ArrayList<DFSClient>();
+
+  /**
+   * A stringified stack trace of the call stack when the Lease Renewer
+   * was instantiated. This is only generated if trace-level logging is
+   * enabled on this class.
+   */
+  private final String instantiationTrace;
+
+  private LeaseRenewer(Factory.Key factorykey) {
+    this.factorykey = factorykey;
+    unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+
+    if (LOG.isTraceEnabled()) {
+      instantiationTrace = StringUtils.stringifyException(
+        new Throwable("TRACE"));
+    } else {
+      instantiationTrace = null;
+    }
+  }
+
+  /** @return the renewal time in milliseconds. */
+  private synchronized long getRenewalTime() {
+    return renewal;
+  }
+
+  /** Used for testing only. */
+  @VisibleForTesting
+  public synchronized void setRenewalTime(final long renewal) {
+    this.renewal = renewal;
+  }
+
+  /** Add a client. */
+  private synchronized void addClient(final DFSClient dfsc) {
+    for(DFSClient c : dfsclients) {
+      if (c == dfsc) {
+        //client already exists, nothing to do.
+        return;
+      }
+    }
+    //client not found, add it
+    dfsclients.add(dfsc);
+
+    //update renewal time
+    final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+    if (hdfsTimeout > 0) {
+      final long half = hdfsTimeout/2;
+      if (half < renewal) {
+        this.renewal = half;
+      }
+    }
+  }
+
+  private synchronized boolean clientsRunning() {
+    for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) {
+      if (!i.next().isClientRunning()) {
+        i.remove();
+      }
+    }
+    return !dfsclients.isEmpty();
+  }
+
+  private synchronized long getSleepPeriod() {
+    return sleepPeriod;
+  }
+
+  /** Set the grace period and adjust the sleep period accordingly. */
+  synchronized void setGraceSleepPeriod(final long gracePeriod) {
+    unsyncSetGraceSleepPeriod(gracePeriod);
+  }
+
+  private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
+    if (gracePeriod < 100L) {
+      throw new HadoopIllegalArgumentException(gracePeriod
+          + " = gracePeriod < 100ms is too small.");
+    }
+    this.gracePeriod = gracePeriod;
+    final long half = gracePeriod/2;
+    this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+        half: LEASE_RENEWER_SLEEP_DEFAULT;
+  }
+
+  /** Is the daemon running? */
+  synchronized boolean isRunning() {
+    return daemon != null && daemon.isAlive();
+  }
+
+  /** Does this renewer have nothing to renew? */
+  public boolean isEmpty() {
+    return dfsclients.isEmpty();
+  }
+
+  /** Used only by tests */
+  synchronized String getDaemonName() {
+    return daemon.getName();
+  }
+
+  /** Is the empty period longer than the grace period? */
+  private synchronized boolean isRenewerExpired() {
+    return emptyTime != Long.MAX_VALUE
+        && Time.monotonicNow() - emptyTime > gracePeriod;
+  }
+
+  public synchronized void put(final long inodeId, final DFSOutputStream out,
+      final DFSClient dfsc) {
+    if (dfsc.isClientRunning()) {
+      if (!isRunning() || isRenewerExpired()) {
+        //start a new deamon with a new id.
+        final int id = ++currentId;
+        daemon = new Daemon(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Lease renewer daemon for " + clientsString()
+                    + " with renew id " + id + " started");
+              }
+              LeaseRenewer.this.run(id);
+            } catch(InterruptedException e) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+                    + " is interrupted.", e);
+              }
+            } finally {
+              synchronized(LeaseRenewer.this) {
+                Factory.INSTANCE.remove(LeaseRenewer.this);
+              }
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Lease renewer daemon for " + clientsString()
+                    + " with renew id " + id + " exited");
+              }
+            }
+          }
+
+          @Override
+          public String toString() {
+            return String.valueOf(LeaseRenewer.this);
+          }
+        });
+        daemon.start();
+      }
+      dfsc.putFileBeingWritten(inodeId, out);
+      emptyTime = Long.MAX_VALUE;
+    }
+  }
+
+  @VisibleForTesting
+  synchronized void setEmptyTime(long time) {
+    emptyTime = time;
+  }
+
+  /** Close a file. */
+  public void closeFile(final long inodeId, final DFSClient dfsc) {
+    dfsc.removeFileBeingWritten(inodeId);
+
+    synchronized(this) {
+      if (dfsc.isFilesBeingWrittenEmpty()) {
+        dfsclients.remove(dfsc);
+      }
+      //update emptyTime if necessary
+      if (emptyTime == Long.MAX_VALUE) {
+        for(DFSClient c : dfsclients) {
+          if (!c.isFilesBeingWrittenEmpty()) {
+            //found a non-empty file-being-written map
+            return;
+          }
+        }
+        //discover the first time that all file-being-written maps are empty.
+        emptyTime = Time.monotonicNow();
+      }
+    }
+  }
+
+  /** Close the given client. */
+  public synchronized void closeClient(final DFSClient dfsc) {
+    dfsclients.remove(dfsc);
+    if (dfsclients.isEmpty()) {
+      if (!isRunning() || isRenewerExpired()) {
+        Factory.INSTANCE.remove(LeaseRenewer.this);
+        return;
+      }
+      if (emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the client list is empty.
+        emptyTime = Time.monotonicNow();
+      }
+    }
+
+    //update renewal time
+    if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
+      long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+      for(DFSClient c : dfsclients) {
+        final int timeout = c.getConf().getHdfsTimeout();
+        if (timeout > 0 && timeout < min) {
+          min = timeout;
+        }
+      }
+      renewal = min/2;
+    }
+  }
+
+  public void interruptAndJoin() throws InterruptedException {
+    Daemon daemonCopy = null;
+    synchronized (this) {
+      if (isRunning()) {
+        daemon.interrupt();
+        daemonCopy = daemon;
+      }
+    }
+
+    if (daemonCopy != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Wait for lease checker to terminate");
+      }
+      daemonCopy.join();
+    }
+  }
+
+  private void renew() throws IOException {
+    final List<DFSClient> copies;
+    synchronized(this) {
+      copies = new ArrayList<DFSClient>(dfsclients);
+    }
+    //sort the client names for finding out repeated names.
+    Collections.sort(copies, new Comparator<DFSClient>() {
+      @Override
+      public int compare(final DFSClient left, final DFSClient right) {
+        return left.getClientName().compareTo(right.getClientName());
+      }
+    });
+    String previousName = "";
+    for(int i = 0; i < copies.size(); i++) {
+      final DFSClient c = copies.get(i);
+      //skip if current client name is the same as the previous name.
+      if (!c.getClientName().equals(previousName)) {
+        if (!c.renewLease()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Did not renew lease for client " +
+                c);
+          }
+          continue;
+        }
+        previousName = c.getClientName();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Lease renewed for client " + previousName);
+        }
+      }
+    }
+  }
+
+  /**
+   * Periodically check in with the namenode and renew all the leases
+   * when the lease period is half over.
+   */
+  private void run(final int id) throws InterruptedException {
+    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
+        Thread.sleep(getSleepPeriod())) {
+      final long elapsed = Time.monotonicNow() - lastRenewed;
+      if (elapsed >= getRenewalTime()) {
+        try {
+          renew();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Lease renewer daemon for " + clientsString()
+                + " with renew id " + id + " executed");
+          }
+          lastRenewed = Time.monotonicNow();
+        } catch (SocketTimeoutException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (elapsed/1000) + " seconds.  Aborting ...", ie);
+          synchronized (this) {
+            while (!dfsclients.isEmpty()) {
+              DFSClient dfsClient = dfsclients.get(0);
+              dfsClient.closeAllFilesBeingWritten(true);
+              closeClient(dfsClient);
+            }
+            //Expire the current LeaseRenewer thread.
+            emptyTime = 0;
+          }
+          break;
+        } catch (IOException ie) {
+          LOG.warn("Failed to renew lease for " + clientsString() + " for "
+              + (elapsed/1000) + " seconds.  Will retry shortly ...", ie);
+        }
+      }
+
+      synchronized(this) {
+        if (id != currentId || isRenewerExpired()) {
+          if (LOG.isDebugEnabled()) {
+            if (id != currentId) {
+              LOG.debug("Lease renewer daemon for " + clientsString()
+                  + " with renew id " + id + " is not current");
+            } else {
+               LOG.debug("Lease renewer daemon for " + clientsString()
+                  + " with renew id " + id + " expired");
+            }
+          }
+          //no longer the current daemon or expired
+          return;
+        }
+
+        // if no clients are in running state or there is no more clients
+        // registered with this renewer, stop the daemon after the grace
+        // period.
+        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+          emptyTime = Time.monotonicNow();
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    String s = getClass().getSimpleName() + ":" + factorykey;
+    if (LOG.isTraceEnabled()) {
+      return s + ", clients=" +  clientsString()
+        + ", created at " + instantiationTrace;
+    }
+    return s;
+  }
+
+  /** Get the names of all clients */
+  private synchronized String clientsString() {
+    if (dfsclients.isEmpty()) {
+      return "[]";
+    } else {
+      final StringBuilder b = new StringBuilder("[").append(
+          dfsclients.get(0).getClientName());
+      for(int i = 1; i < dfsclients.size(); i++) {
+        b.append(", ").append(dfsclients.get(i).getClientName());
+      }
+      return b.append("]").toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
new file mode 100644
index 0000000..e4b51c5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MissingEventsException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  private long expectedTxid;
+  private long actualTxid;
+
+  public MissingEventsException() {}
+
+  public MissingEventsException(long expectedTxid, long actualTxid) {
+    this.expectedTxid = expectedTxid;
+    this.actualTxid = actualTxid;
+  }
+
+  public long getExpectedTxid() {
+    return expectedTxid;
+  }
+
+  public long getActualTxid() {
+    return actualTxid;
+  }
+
+  @Override
+  public String toString() {
+    return "We expected the next batch of events to start with transaction ID "
+        + expectedTxid + ", but it instead started with transaction ID " +
+        actualTxid + ". Most likely the intervening transactions were cleaned "
+        + "up as part of checkpointing.";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
new file mode 100644
index 0000000..1210999
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Indicates a failure manipulating an ACL.
+ */
+@InterfaceAudience.Private
+public class AclException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Creates a new AclException.
+   *
+   * @param message String message
+   */
+  public AclException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
new file mode 100644
index 0000000..923cdb4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CacheDirectiveIterator is a remote iterator that iterates cache directives.
+ * It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CacheDirectiveIterator
+    extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
+
+  private CacheDirectiveInfo filter;
+  private final ClientProtocol namenode;
+  private final Sampler<?> traceSampler;
+
+  public CacheDirectiveIterator(ClientProtocol namenode,
+      CacheDirectiveInfo filter, Sampler<?> traceSampler) {
+    super(0L);
+    this.namenode = namenode;
+    this.filter = filter;
+    this.traceSampler = traceSampler;
+  }
+
+  private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
+    CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter);
+    builder.setId(null);
+    return builder.build();
+  }
+
+  /**
+   * Used for compatibility when communicating with a server version that
+   * does not support filtering directives by ID.
+   */
+  private static class SingleEntry implements
+      BatchedEntries<CacheDirectiveEntry> {
+
+    private final CacheDirectiveEntry entry;
+
+    public SingleEntry(final CacheDirectiveEntry entry) {
+      this.entry = entry;
+    }
+
+    @Override
+    public CacheDirectiveEntry get(int i) {
+      if (i > 0) {
+        return null;
+      }
+      return entry;
+    }
+
+    @Override
+    public int size() {
+      return 1;
+    }
+
+    @Override
+    public boolean hasMore() {
+      return false;
+    }
+  }
+
+  @Override
+  public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
+      throws IOException {
+    BatchedEntries<CacheDirectiveEntry> entries = null;
+    TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler);
+    try {
+      entries = namenode.listCacheDirectives(prevKey, filter);
+    } catch (IOException e) {
+      if (e.getMessage().contains("Filtering by ID is unsupported")) {
+        // Retry case for old servers, do the filtering client-side
+        long id = filter.getId();
+        filter = removeIdFromFilter(filter);
+        // Using id - 1 as prevId should get us a window containing the id
+        // This is somewhat brittle, since it depends on directives being
+        // returned in order of ascending ID.
+        entries = namenode.listCacheDirectives(id - 1, filter);
+        for (int i=0; i<entries.size(); i++) {
+          CacheDirectiveEntry entry = entries.get(i);
+          if (entry.getInfo().getId().equals((Long)id)) {
+            return new SingleEntry(entry);
+          }
+        }
+        throw new RemoteException(InvalidRequestException.class.getName(),
+            "Did not find requested id " + id);
+      }
+      throw e;
+    } finally {
+      scope.close();
+    }
+    Preconditions.checkNotNull(entries);
+    return entries;
+  }
+
+  @Override
+  public Long elementToPrevKey(CacheDirectiveEntry entry) {
+    return entry.getInfo().getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
new file mode 100644
index 0000000..e9481f7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * CachePoolIterator is a remote iterator that iterates cache pools.
+ * It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CachePoolIterator
+    extends BatchedRemoteIterator<String, CachePoolEntry> {
+
+  private final ClientProtocol namenode;
+  private final Sampler traceSampler;
+
+  public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
+    super("");
+    this.namenode = namenode;
+    this.traceSampler = traceSampler;
+  }
+
+  @Override
+  public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
+      throws IOException {
+    TraceScope scope = Trace.startSpan("listCachePools", traceSampler);
+    try {
+      return namenode.listCachePools(prevKey);
+    } finally {
+      scope.close();
+    }
+  }
+
+  @Override
+  public String elementToPrevKey(CachePoolEntry entry) {
+    return entry.getInfo().getPoolName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
new file mode 100644
index 0000000..0141215
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * EncryptionZoneIterator is a remote iterator that iterates over encryption
+ * zones. It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class EncryptionZoneIterator
+    extends BatchedRemoteIterator<Long, EncryptionZone> {
+
+  private final ClientProtocol namenode;
+  private final Sampler<?> traceSampler;
+
+  public EncryptionZoneIterator(ClientProtocol namenode,
+                                Sampler<?> traceSampler) {
+    super(Long.valueOf(0));
+    this.namenode = namenode;
+    this.traceSampler = traceSampler;
+  }
+
+  @Override
+  public BatchedEntries<EncryptionZone> makeRequest(Long prevId)
+      throws IOException {
+    TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler);
+    try {
+      return namenode.listEncryptionZones(prevId);
+    } finally {
+      scope.close();
+    }
+  }
+
+  @Override
+  public Long elementToPrevKey(EncryptionZone entry) {
+    return entry.getId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
new file mode 100644
index 0000000..25084c7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+
+import static org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix.long2String;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaByStorageTypeExceededException extends QuotaExceededException {
+  protected static final long serialVersionUID = 1L;
+  protected StorageType type;
+
+  public QuotaByStorageTypeExceededException() {}
+
+  public QuotaByStorageTypeExceededException(String msg) {
+    super(msg);
+  }
+
+  public QuotaByStorageTypeExceededException(long quota, long count, StorageType type) {
+    super(quota, count);
+    this.type = type;
+  }
+
+  @Override
+  public String getMessage() {
+    String msg = super.getMessage();
+    if (msg == null) {
+      return "Quota by storage type : " + type.toString() +
+          " on path : " + (pathName==null ? "": pathName) +
+          " is exceeded. quota = "  + long2String(quota, "B", 2) +
+          " but space consumed = " + long2String(count, "B", 2);
+    } else {
+      return msg;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
new file mode 100644
index 0000000..03fb704
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.Path;
+
+/** 
+ * Thrown when a symbolic link is encountered in a path.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class UnresolvedPathException extends UnresolvedLinkException {
+  private static final long serialVersionUID = 1L;
+  private String path;        // The path containing the link
+  private String preceding;   // The path part preceding the link
+  private String remainder;   // The path part following the link
+  private String linkTarget;  // The link's target
+
+  /**
+   * Used by RemoteException to instantiate an UnresolvedPathException.
+   */
+  public UnresolvedPathException(String msg) {
+    super(msg);
+  }
+  
+  public UnresolvedPathException(String path, String preceding,
+      String remainder, String linkTarget) {
+    this.path = path;
+    this.preceding = preceding;
+    this.remainder = remainder;
+    this.linkTarget = linkTarget;
+  }
+
+  /**
+   * Return a path with the link resolved with the target.
+   */
+  public Path getResolvedPath() throws IOException {
+    // If the path is absolute we cam throw out the preceding part and
+    // just append the remainder to the target, otherwise append each
+    // piece to resolve the link in path.
+    boolean noRemainder = (remainder == null || "".equals(remainder));
+    Path target = new Path(linkTarget);
+    if (target.isUriPathAbsolute()) {
+      return noRemainder ? target : new Path(target, remainder);
+    } else {
+      return noRemainder
+        ? new Path(preceding, target)
+        : new Path(new Path(preceding, linkTarget), remainder);
+    }
+  }
+
+  @Override
+  public String getMessage() {
+    String msg = super.getMessage();
+    if (msg != null) {
+      return msg;
+    }
+    String myMsg = "Unresolved path " + path;
+    try {
+      return getResolvedPath().toString();
+    } catch (IOException e) {
+      // Ignore
+    }
+    return myMsg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
new file mode 100644
index 0000000..c69986a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * The setting of replace-datanode-on-failure feature.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplaceDatanodeOnFailure {
+  /** The replacement policies */
+  public enum Policy {
+    /** The feature is disabled in the entire site. */
+    DISABLE(Condition.FALSE),
+    /** Never add a new datanode. */
+    NEVER(Condition.FALSE),
+    /** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
+    DEFAULT(Condition.DEFAULT),
+    /** Always add a new datanode when an existing datanode is removed. */
+    ALWAYS(Condition.TRUE);
+
+    private final Condition condition;
+
+    private Policy(Condition condition) {
+      this.condition = condition;
+    }
+    
+    Condition getCondition() {
+      return condition;
+    }
+  }
+
+  /** Datanode replacement condition */
+  private static interface Condition {
+    /** Return true unconditionally. */
+    static final Condition TRUE = new Condition() {
+      @Override
+      public boolean satisfy(short replication, DatanodeInfo[] existings,
+          int nExistings, boolean isAppend, boolean isHflushed) {
+        return true;
+      }
+    };
+
+    /** Return false unconditionally. */
+    static final Condition FALSE = new Condition() {
+      @Override
+      public boolean satisfy(short replication, DatanodeInfo[] existings,
+          int nExistings, boolean isAppend, boolean isHflushed) {
+        return false;
+      }
+    };
+
+    /**
+     * DEFAULT condition:
+     *   Let r be the replication number.
+     *   Let n be the number of existing datanodes.
+     *   Add a new datanode only if r >= 3 and either
+     *   (1) floor(r/2) >= n; or
+     *   (2) r > n and the block is hflushed/appended.
+     */
+    static final Condition DEFAULT = new Condition() {
+      @Override
+      public boolean satisfy(final short replication,
+          final DatanodeInfo[] existings, final int n, final boolean isAppend,
+          final boolean isHflushed) {
+        if (replication < 3) {
+          return false;
+        } else {
+          if (n <= (replication/2)) {
+            return true;
+          } else {
+            return isAppend || isHflushed;
+          }
+        }
+      }
+    };
+
+    /** Is the condition satisfied? */
+    public boolean satisfy(short replication, DatanodeInfo[] existings,
+        int nExistings, boolean isAppend, boolean isHflushed);
+  }
+
+  private final Policy policy;
+  private final boolean bestEffort;
+  
+  public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
+    this.policy = policy;
+    this.bestEffort = bestEffort;
+  }
+
+  /** Check if the feature is enabled. */
+  public void checkEnabled() {
+    if (policy == Policy.DISABLE) {
+      throw new UnsupportedOperationException(
+          "This feature is disabled.  Please refer to "
+          + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY
+          + " configuration property.");
+    }
+  }
+
+  /**
+   * Best effort means that the client will try to replace the failed datanode
+   * (provided that the policy is satisfied), however, it will continue the
+   * write operation in case that the datanode replacement also fails.
+   * 
+   * @return Suppose the datanode replacement fails.
+   *     false: An exception should be thrown so that the write will fail.
+   *     true : The write should be resumed with the remaining datandoes.
+   */
+  public boolean isBestEffort() {
+    return bestEffort;
+  }
+
+  /** Does it need a replacement according to the policy? */
+  public boolean satisfy(
+      final short replication, final DatanodeInfo[] existings,
+      final boolean isAppend, final boolean isHflushed) {
+    final int n = existings == null? 0: existings.length;
+    if (n == 0 || n >= replication) {
+      //don't need to add datanode for any policy.
+      return false;
+    } else {
+      return policy.getCondition().satisfy(
+          replication, existings, n, isAppend, isHflushed);
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return policy.toString();
+  }
+
+  /** Get the setting from configuration. */
+  public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+    final Policy policy = getPolicy(conf);
+    final boolean bestEffort = conf.getBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT);
+    
+    return new ReplaceDatanodeOnFailure(policy, bestEffort);
+  }
+
+  private static Policy getPolicy(final Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT);
+    if (!enabled) {
+      return Policy.DISABLE;
+    }
+
+    final String policy = conf.get(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT);
+    for(int i = 1; i < Policy.values().length; i++) {
+      final Policy p = Policy.values()[i];
+      if (p.name().equalsIgnoreCase(policy)) {
+        return p;
+      }
+    }
+    throw new HadoopIllegalArgumentException("Illegal configuration value for "
+        + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY
+        + ": " + policy);
+  }
+
+  /** Write the setting to configuration. */
+  public static void write(final Policy policy,
+      final boolean bestEffort, final Configuration conf) {
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        policy != Policy.DISABLE);
+    conf.set(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
+        policy.name());
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
+        bestEffort);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
new file mode 100644
index 0000000..b159d3a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.  
+ */
+public class ReplicaNotFoundException extends IOException {
+  private static final long serialVersionUID = 1L;
+  public final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
+  public final static String UNFINALIZED_REPLICA = 
+    "Cannot append to an unfinalized replica ";
+  public final static String UNFINALIZED_AND_NONRBW_REPLICA = 
+    "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
+  public final static String NON_EXISTENT_REPLICA =
+    "Cannot append to a non-existent replica ";
+  public final static String UNEXPECTED_GS_REPLICA =
+    "Cannot append to a replica with unexpected generation stamp ";
+
+  public ReplicaNotFoundException() {
+    super();
+  }
+
+  public ReplicaNotFoundException(ExtendedBlock b) {
+    super("Replica not found for " + b);
+  }
+  
+  public ReplicaNotFoundException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
new file mode 100644
index 0000000..0bdd2a5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class RetryStartFileException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public RetryStartFileException() {
+    super("Preconditions for creating a file failed because of a " +
+        "transient error, retry create later.");
+  }
+
+  public RetryStartFileException(String s) {
+    super(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b3940b5..4ebf437 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -977,6 +977,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
     via Colin P. McCabe)
 
+    HDFS-8053. Move DFSIn/OutputStream and related classes to
+    hadoop-hdfs-client. (Mingliang Liu via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 60029e0..c88c4c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -74,15 +74,6 @@
      </Match>
 
      <!--
-      ResponseProccessor is thread that is designed to catch RuntimeException.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" />
-       <Method name="run" />
-       <Bug pattern="REC_CATCH_EXCEPTION" />
-     </Match>
-
-     <!--
       lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots.
       See the comments in BackupImage for justification.
      -->
@@ -196,14 +187,4 @@
       <Method name="assertAllResultsEqual" />
       <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
     </Match>
-
-    <!--
-     We use a separate lock to guard cachingStrategy in order to separate
-     locks for p-reads from seek + read invocations.
-    -->
-    <Match>
-        <Class name="org.apache.hadoop.hdfs.DFSInputStream" />
-        <Field name="cachingStrategy" />
-        <Bug pattern="IS2_INCONSISTENT_SYNC" />
-    </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
deleted file mode 100644
index 0ccacda..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-
-/**
- * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
- * allowing more detailed queries to the datanode about a block.
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HdfsBlockLocation extends BlockLocation {
-
-  private final LocatedBlock block;
-  
-  public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) 
-      throws IOException {
-    // Initialize with data from passed in BlockLocation
-    super(loc);
-    this.block = block;
-  }
-  
-  public LocatedBlock getLocatedBlock() {
-    return block;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
deleted file mode 100644
index 7bba8a4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/** 
-  * This exception is thrown when a read encounters a block that has no locations
-  * associated with it.
-  */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class BlockMissingException extends IOException {
-
-  private static final long serialVersionUID = 1L;
-
-  private final String filename;
-  private final long   offset;
-
-  /**
-   * An exception that indicates that file was corrupted.
-   * @param filename name of corrupted file
-   * @param description a description of the corruption details
-   */
-  public BlockMissingException(String filename, String description, long offset) {
-    super(description);
-    this.filename = filename;
-    this.offset = offset;
-  }
-
-  /**
-   * Returns the name of the corrupted file.
-   * @return name of corrupted file
-   */
-  public String getFile() {
-    return filename;
-  }
-
-  /**
-   * Returns the offset at which this file is corrupted
-   * @return offset of corrupted file
-   */
-  public long getOffset() {
-    return offset;
-  }
-}


Mime
View raw message