Return-Path:
X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org
Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 96A4818934
for ;
Sat, 26 Sep 2015 18:17:13 +0000 (UTC)
Received: (qmail 41081 invoked by uid 500); 26 Sep 2015 18:17:11 -0000
Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org
Received: (qmail 40898 invoked by uid 500); 26 Sep 2015 18:17:11 -0000
Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: common-dev@hadoop.apache.org
Delivered-To: mailing list common-commits@hadoop.apache.org
Received: (qmail 40453 invoked by uid 99); 26 Sep 2015 18:17:11 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org)
(140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Sep 2015 18:17:11 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at
git1-us-west.apache.org, from userid 33)
id C342AE0419; Sat, 26 Sep 2015 18:17:10 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: wheat9@apache.org
To: common-commits@hadoop.apache.org
Date: Sat, 26 Sep 2015 18:17:16 -0000
Message-Id:
In-Reply-To: <65e820562a684ebe8d4ff7edaac40cdd@git.apache.org>
References: <65e820562a684ebe8d4ff7edaac40cdd@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [07/12] hadoop git commit: HDFS-8053. Move DFSIn/OutputStream and
related classes to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
new file mode 100644
index 0000000..745ca7e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.crypto.CryptoOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The Hdfs implementation of {@link FSDataOutputStream}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class HdfsDataOutputStream extends FSDataOutputStream {
+ public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats,
+ long startPosition) throws IOException {
+ super(out, stats, startPosition);
+ }
+
+ public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats
+ ) throws IOException {
+ this(out, stats, 0L);
+ }
+
+ public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats,
+ long startPosition) throws IOException {
+ super(out, stats, startPosition);
+ Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream,
+ "CryptoOutputStream should wrap a DFSOutputStream");
+ }
+
+ public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats)
+ throws IOException {
+ this(out, stats, 0L);
+ }
+
+ /**
+ * Get the actual number of replicas of the current block.
+ *
+ * This can be different from the designated replication factor of the file
+ * because the namenode does not maintain replication for the blocks which are
+ * currently being written to. Depending on the configuration, the client may
+ * continue to write to a block even if a few datanodes in the write pipeline
+ * have failed, or the client may add a new datanodes once a datanode has
+ * failed.
+ *
+ * @return the number of valid replicas of the current block
+ */
+ public synchronized int getCurrentBlockReplication() throws IOException {
+ OutputStream wrappedStream = getWrappedStream();
+ if (wrappedStream instanceof CryptoOutputStream) {
+ wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
+ }
+ return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication();
+ }
+
+ /**
+ * Sync buffered data to DataNodes (flush to disk devices).
+ *
+ * @param syncFlags
+ * Indicate the detailed semantic and actions of the hsync.
+ * @throws IOException
+ * @see FSDataOutputStream#hsync()
+ */
+ public void hsync(EnumSet syncFlags) throws IOException {
+ OutputStream wrappedStream = getWrappedStream();
+ if (wrappedStream instanceof CryptoOutputStream) {
+ ((CryptoOutputStream) wrappedStream).flush();
+ wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
+ }
+ ((DFSOutputStream) wrappedStream).hsync(syncFlags);
+ }
+
+ public static enum SyncFlag {
+
+ /**
+ * When doing sync to DataNodes, also update the metadata (block length) in
+ * the NameNode.
+ */
+ UPDATE_LENGTH,
+
+ /**
+ * Sync the data to DataNode, close the current block, and allocate a new
+ * block
+ */
+ END_BLOCK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
new file mode 100644
index 0000000..c3d2cfc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases
+ * on the namenode.
+ * When a file is opened for write (create or append),
+ * namenode stores a file lease for recording the identity of the writer.
+ * The writer (i.e. the DFSClient) is required to renew the lease periodically.
+ * When the lease is not renewed before it expires,
+ * the namenode considers the writer as failed and then it may either let
+ * another writer to obtain the lease or close the file.
+ *
+ *
+ * This class also provides the following functionality:
+ *
+ * -
+ * It maintains a map from (namenode, user) pairs to lease renewers.
+ * The same {@link LeaseRenewer} instance is used for renewing lease
+ * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user.
+ *
+ * -
+ * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}.
+ * Periodically the leases for all the clients are renewed.
+ * A client is removed from the list when the client is closed.
+ *
+ * -
+ * A thread per namenode per user is used by the {@link LeaseRenewer}
+ * to renew the leases.
+ *
+ *
+ *
+ */
+@InterfaceAudience.Private
+public class LeaseRenewer {
+ static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
+
+ static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+ static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
+
+ /** Get a {@link LeaseRenewer} instance */
+ public static LeaseRenewer getInstance(final String authority,
+ final UserGroupInformation ugi, final DFSClient dfsc) throws IOException {
+ final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi);
+ r.addClient(dfsc);
+ return r;
+ }
+
+ /**
+ * A factory for sharing {@link LeaseRenewer} objects
+ * among {@link DFSClient} instances
+ * so that there is only one renewer per authority per user.
+ */
+ private static class Factory {
+ private static final Factory INSTANCE = new Factory();
+
+ private static class Key {
+ /** Namenode info */
+ final String authority;
+ /** User info */
+ final UserGroupInformation ugi;
+
+ private Key(final String authority, final UserGroupInformation ugi) {
+ if (authority == null) {
+ throw new HadoopIllegalArgumentException("authority == null");
+ } else if (ugi == null) {
+ throw new HadoopIllegalArgumentException("ugi == null");
+ }
+
+ this.authority = authority;
+ this.ugi = ugi;
+ }
+
+ @Override
+ public int hashCode() {
+ return authority.hashCode() ^ ugi.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj != null && obj instanceof Key) {
+ final Key that = (Key)obj;
+ return this.authority.equals(that.authority)
+ && this.ugi.equals(that.ugi);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return ugi.getShortUserName() + "@" + authority;
+ }
+ }
+
+ /** A map for per user per namenode renewers. */
+ private final Map renewers = new HashMap();
+
+ /** Get a renewer. */
+ private synchronized LeaseRenewer get(final String authority,
+ final UserGroupInformation ugi) {
+ final Key k = new Key(authority, ugi);
+ LeaseRenewer r = renewers.get(k);
+ if (r == null) {
+ r = new LeaseRenewer(k);
+ renewers.put(k, r);
+ }
+ return r;
+ }
+
+ /** Remove the given renewer. */
+ private synchronized void remove(final LeaseRenewer r) {
+ final LeaseRenewer stored = renewers.get(r.factorykey);
+ //Since a renewer may expire, the stored renewer can be different.
+ if (r == stored) {
+ if (!r.clientsRunning()) {
+ renewers.remove(r.factorykey);
+ }
+ }
+ }
+ }
+
+ /** The time in milliseconds that the map became empty. */
+ private long emptyTime = Long.MAX_VALUE;
+ /** A fixed lease renewal time period in milliseconds */
+ private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2;
+
+ /** A daemon for renewing lease */
+ private Daemon daemon = null;
+ /** Only the daemon with currentId should run. */
+ private int currentId = 0;
+
+ /**
+ * A period in milliseconds that the lease renewer thread should run
+ * after the map became empty.
+ * In other words,
+ * if the map is empty for a time period longer than the grace period,
+ * the renewer should terminate.
+ */
+ private long gracePeriod;
+ /**
+ * The time period in milliseconds
+ * that the renewer sleeps for each iteration.
+ */
+ private long sleepPeriod;
+
+ private final Factory.Key factorykey;
+
+ /** A list of clients corresponding to this renewer. */
+ private final List dfsclients = new ArrayList();
+
+ /**
+ * A stringified stack trace of the call stack when the Lease Renewer
+ * was instantiated. This is only generated if trace-level logging is
+ * enabled on this class.
+ */
+ private final String instantiationTrace;
+
+ private LeaseRenewer(Factory.Key factorykey) {
+ this.factorykey = factorykey;
+ unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+
+ if (LOG.isTraceEnabled()) {
+ instantiationTrace = StringUtils.stringifyException(
+ new Throwable("TRACE"));
+ } else {
+ instantiationTrace = null;
+ }
+ }
+
+ /** @return the renewal time in milliseconds. */
+ private synchronized long getRenewalTime() {
+ return renewal;
+ }
+
+ /** Used for testing only. */
+ @VisibleForTesting
+ public synchronized void setRenewalTime(final long renewal) {
+ this.renewal = renewal;
+ }
+
+ /** Add a client. */
+ private synchronized void addClient(final DFSClient dfsc) {
+ for(DFSClient c : dfsclients) {
+ if (c == dfsc) {
+ //client already exists, nothing to do.
+ return;
+ }
+ }
+ //client not found, add it
+ dfsclients.add(dfsc);
+
+ //update renewal time
+ final int hdfsTimeout = dfsc.getConf().getHdfsTimeout();
+ if (hdfsTimeout > 0) {
+ final long half = hdfsTimeout/2;
+ if (half < renewal) {
+ this.renewal = half;
+ }
+ }
+ }
+
+ private synchronized boolean clientsRunning() {
+ for(Iterator i = dfsclients.iterator(); i.hasNext(); ) {
+ if (!i.next().isClientRunning()) {
+ i.remove();
+ }
+ }
+ return !dfsclients.isEmpty();
+ }
+
+ private synchronized long getSleepPeriod() {
+ return sleepPeriod;
+ }
+
+ /** Set the grace period and adjust the sleep period accordingly. */
+ synchronized void setGraceSleepPeriod(final long gracePeriod) {
+ unsyncSetGraceSleepPeriod(gracePeriod);
+ }
+
+ private void unsyncSetGraceSleepPeriod(final long gracePeriod) {
+ if (gracePeriod < 100L) {
+ throw new HadoopIllegalArgumentException(gracePeriod
+ + " = gracePeriod < 100ms is too small.");
+ }
+ this.gracePeriod = gracePeriod;
+ final long half = gracePeriod/2;
+ this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+ half: LEASE_RENEWER_SLEEP_DEFAULT;
+ }
+
+ /** Is the daemon running? */
+ synchronized boolean isRunning() {
+ return daemon != null && daemon.isAlive();
+ }
+
+ /** Does this renewer have nothing to renew? */
+ public boolean isEmpty() {
+ return dfsclients.isEmpty();
+ }
+
+ /** Used only by tests */
+ synchronized String getDaemonName() {
+ return daemon.getName();
+ }
+
+ /** Is the empty period longer than the grace period? */
+ private synchronized boolean isRenewerExpired() {
+ return emptyTime != Long.MAX_VALUE
+ && Time.monotonicNow() - emptyTime > gracePeriod;
+ }
+
+ public synchronized void put(final long inodeId, final DFSOutputStream out,
+ final DFSClient dfsc) {
+ if (dfsc.isClientRunning()) {
+ if (!isRunning() || isRenewerExpired()) {
+ //start a new deamon with a new id.
+ final int id = ++currentId;
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " started");
+ }
+ LeaseRenewer.this.run(id);
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(LeaseRenewer.this.getClass().getSimpleName()
+ + " is interrupted.", e);
+ }
+ } finally {
+ synchronized(LeaseRenewer.this) {
+ Factory.INSTANCE.remove(LeaseRenewer.this);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " exited");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(LeaseRenewer.this);
+ }
+ });
+ daemon.start();
+ }
+ dfsc.putFileBeingWritten(inodeId, out);
+ emptyTime = Long.MAX_VALUE;
+ }
+ }
+
+ @VisibleForTesting
+ synchronized void setEmptyTime(long time) {
+ emptyTime = time;
+ }
+
+ /** Close a file. */
+ public void closeFile(final long inodeId, final DFSClient dfsc) {
+ dfsc.removeFileBeingWritten(inodeId);
+
+ synchronized(this) {
+ if (dfsc.isFilesBeingWrittenEmpty()) {
+ dfsclients.remove(dfsc);
+ }
+ //update emptyTime if necessary
+ if (emptyTime == Long.MAX_VALUE) {
+ for(DFSClient c : dfsclients) {
+ if (!c.isFilesBeingWrittenEmpty()) {
+ //found a non-empty file-being-written map
+ return;
+ }
+ }
+ //discover the first time that all file-being-written maps are empty.
+ emptyTime = Time.monotonicNow();
+ }
+ }
+ }
+
+ /** Close the given client. */
+ public synchronized void closeClient(final DFSClient dfsc) {
+ dfsclients.remove(dfsc);
+ if (dfsclients.isEmpty()) {
+ if (!isRunning() || isRenewerExpired()) {
+ Factory.INSTANCE.remove(LeaseRenewer.this);
+ return;
+ }
+ if (emptyTime == Long.MAX_VALUE) {
+ //discover the first time that the client list is empty.
+ emptyTime = Time.monotonicNow();
+ }
+ }
+
+ //update renewal time
+ if (renewal == dfsc.getConf().getHdfsTimeout()/2) {
+ long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+ for(DFSClient c : dfsclients) {
+ final int timeout = c.getConf().getHdfsTimeout();
+ if (timeout > 0 && timeout < min) {
+ min = timeout;
+ }
+ }
+ renewal = min/2;
+ }
+ }
+
+ public void interruptAndJoin() throws InterruptedException {
+ Daemon daemonCopy = null;
+ synchronized (this) {
+ if (isRunning()) {
+ daemon.interrupt();
+ daemonCopy = daemon;
+ }
+ }
+
+ if (daemonCopy != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Wait for lease checker to terminate");
+ }
+ daemonCopy.join();
+ }
+ }
+
+ private void renew() throws IOException {
+ final List copies;
+ synchronized(this) {
+ copies = new ArrayList(dfsclients);
+ }
+ //sort the client names for finding out repeated names.
+ Collections.sort(copies, new Comparator() {
+ @Override
+ public int compare(final DFSClient left, final DFSClient right) {
+ return left.getClientName().compareTo(right.getClientName());
+ }
+ });
+ String previousName = "";
+ for(int i = 0; i < copies.size(); i++) {
+ final DFSClient c = copies.get(i);
+ //skip if current client name is the same as the previous name.
+ if (!c.getClientName().equals(previousName)) {
+ if (!c.renewLease()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Did not renew lease for client " +
+ c);
+ }
+ continue;
+ }
+ previousName = c.getClientName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewed for client " + previousName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Periodically check in with the namenode and renew all the leases
+ * when the lease period is half over.
+ */
+ private void run(final int id) throws InterruptedException {
+ for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
+ Thread.sleep(getSleepPeriod())) {
+ final long elapsed = Time.monotonicNow() - lastRenewed;
+ if (elapsed >= getRenewalTime()) {
+ try {
+ renew();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " executed");
+ }
+ lastRenewed = Time.monotonicNow();
+ } catch (SocketTimeoutException ie) {
+ LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ + (elapsed/1000) + " seconds. Aborting ...", ie);
+ synchronized (this) {
+ while (!dfsclients.isEmpty()) {
+ DFSClient dfsClient = dfsclients.get(0);
+ dfsClient.closeAllFilesBeingWritten(true);
+ closeClient(dfsClient);
+ }
+ //Expire the current LeaseRenewer thread.
+ emptyTime = 0;
+ }
+ break;
+ } catch (IOException ie) {
+ LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ + (elapsed/1000) + " seconds. Will retry shortly ...", ie);
+ }
+ }
+
+ synchronized(this) {
+ if (id != currentId || isRenewerExpired()) {
+ if (LOG.isDebugEnabled()) {
+ if (id != currentId) {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " is not current");
+ } else {
+ LOG.debug("Lease renewer daemon for " + clientsString()
+ + " with renew id " + id + " expired");
+ }
+ }
+ //no longer the current daemon or expired
+ return;
+ }
+
+ // if no clients are in running state or there is no more clients
+ // registered with this renewer, stop the daemon after the grace
+ // period.
+ if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+ emptyTime = Time.monotonicNow();
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ String s = getClass().getSimpleName() + ":" + factorykey;
+ if (LOG.isTraceEnabled()) {
+ return s + ", clients=" + clientsString()
+ + ", created at " + instantiationTrace;
+ }
+ return s;
+ }
+
+ /** Get the names of all clients */
+ private synchronized String clientsString() {
+ if (dfsclients.isEmpty()) {
+ return "[]";
+ } else {
+ final StringBuilder b = new StringBuilder("[").append(
+ dfsclients.get(0).getClientName());
+ for(int i = 1; i < dfsclients.size(); i++) {
+ b.append(", ").append(dfsclients.get(i).getClientName());
+ }
+ return b.append("]").toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
new file mode 100644
index 0000000..e4b51c5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MissingEventsException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ private long expectedTxid;
+ private long actualTxid;
+
+ public MissingEventsException() {}
+
+ public MissingEventsException(long expectedTxid, long actualTxid) {
+ this.expectedTxid = expectedTxid;
+ this.actualTxid = actualTxid;
+ }
+
+ public long getExpectedTxid() {
+ return expectedTxid;
+ }
+
+ public long getActualTxid() {
+ return actualTxid;
+ }
+
+ @Override
+ public String toString() {
+ return "We expected the next batch of events to start with transaction ID "
+ + expectedTxid + ", but it instead started with transaction ID " +
+ actualTxid + ". Most likely the intervening transactions were cleaned "
+ + "up as part of checkpointing.";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
new file mode 100644
index 0000000..1210999
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Indicates a failure manipulating an ACL.
+ */
+@InterfaceAudience.Private
+public class AclException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Creates a new AclException.
+ *
+ * @param message String message
+ */
+ public AclException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
new file mode 100644
index 0000000..923cdb4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * CacheDirectiveIterator is a remote iterator that iterates cache directives.
+ * It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CacheDirectiveIterator
+ extends BatchedRemoteIterator {
+
+ private CacheDirectiveInfo filter;
+ private final ClientProtocol namenode;
+ private final Sampler> traceSampler;
+
+ public CacheDirectiveIterator(ClientProtocol namenode,
+ CacheDirectiveInfo filter, Sampler> traceSampler) {
+ super(0L);
+ this.namenode = namenode;
+ this.filter = filter;
+ this.traceSampler = traceSampler;
+ }
+
+ private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
+ CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter);
+ builder.setId(null);
+ return builder.build();
+ }
+
+ /**
+ * Used for compatibility when communicating with a server version that
+ * does not support filtering directives by ID.
+ */
+ private static class SingleEntry implements
+ BatchedEntries {
+
+ private final CacheDirectiveEntry entry;
+
+ public SingleEntry(final CacheDirectiveEntry entry) {
+ this.entry = entry;
+ }
+
+ @Override
+ public CacheDirectiveEntry get(int i) {
+ if (i > 0) {
+ return null;
+ }
+ return entry;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public boolean hasMore() {
+ return false;
+ }
+ }
+
+ @Override
+ public BatchedEntries makeRequest(Long prevKey)
+ throws IOException {
+ BatchedEntries entries = null;
+ TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler);
+ try {
+ entries = namenode.listCacheDirectives(prevKey, filter);
+ } catch (IOException e) {
+ if (e.getMessage().contains("Filtering by ID is unsupported")) {
+ // Retry case for old servers, do the filtering client-side
+ long id = filter.getId();
+ filter = removeIdFromFilter(filter);
+ // Using id - 1 as prevId should get us a window containing the id
+ // This is somewhat brittle, since it depends on directives being
+ // returned in order of ascending ID.
+ entries = namenode.listCacheDirectives(id - 1, filter);
+ for (int i=0; i {
+
+ private final ClientProtocol namenode;
+ private final Sampler traceSampler;
+
+ public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
+ super("");
+ this.namenode = namenode;
+ this.traceSampler = traceSampler;
+ }
+
+ @Override
+ public BatchedEntries makeRequest(String prevKey)
+ throws IOException {
+ TraceScope scope = Trace.startSpan("listCachePools", traceSampler);
+ try {
+ return namenode.listCachePools(prevKey);
+ } finally {
+ scope.close();
+ }
+ }
+
+ @Override
+ public String elementToPrevKey(CachePoolEntry entry) {
+ return entry.getInfo().getPoolName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
new file mode 100644
index 0000000..0141215
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * EncryptionZoneIterator is a remote iterator that iterates over encryption
+ * zones. It supports retrying in case of namenode failover.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class EncryptionZoneIterator
+ extends BatchedRemoteIterator {
+
+ private final ClientProtocol namenode;
+ private final Sampler> traceSampler;
+
+ public EncryptionZoneIterator(ClientProtocol namenode,
+ Sampler> traceSampler) {
+ super(Long.valueOf(0));
+ this.namenode = namenode;
+ this.traceSampler = traceSampler;
+ }
+
+ @Override
+ public BatchedEntries makeRequest(Long prevId)
+ throws IOException {
+ TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler);
+ try {
+ return namenode.listEncryptionZones(prevId);
+ } finally {
+ scope.close();
+ }
+ }
+
+ @Override
+ public Long elementToPrevKey(EncryptionZone entry) {
+ return entry.getId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
new file mode 100644
index 0000000..25084c7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+
+import static org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix.long2String;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaByStorageTypeExceededException extends QuotaExceededException {
+ protected static final long serialVersionUID = 1L;
+ protected StorageType type;
+
+ public QuotaByStorageTypeExceededException() {}
+
+ public QuotaByStorageTypeExceededException(String msg) {
+ super(msg);
+ }
+
+ public QuotaByStorageTypeExceededException(long quota, long count, StorageType type) {
+ super(quota, count);
+ this.type = type;
+ }
+
+ @Override
+ public String getMessage() {
+ String msg = super.getMessage();
+ if (msg == null) {
+ return "Quota by storage type : " + type.toString() +
+ " on path : " + (pathName==null ? "": pathName) +
+ " is exceeded. quota = " + long2String(quota, "B", 2) +
+ " but space consumed = " + long2String(count, "B", 2);
+ } else {
+ return msg;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
new file mode 100644
index 0000000..03fb704
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Thrown when a symbolic link is encountered in a path.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class UnresolvedPathException extends UnresolvedLinkException {
+ private static final long serialVersionUID = 1L;
+ private String path; // The path containing the link
+ private String preceding; // The path part preceding the link
+ private String remainder; // The path part following the link
+ private String linkTarget; // The link's target
+
+ /**
+ * Used by RemoteException to instantiate an UnresolvedPathException.
+ */
+ public UnresolvedPathException(String msg) {
+ super(msg);
+ }
+
+ public UnresolvedPathException(String path, String preceding,
+ String remainder, String linkTarget) {
+ this.path = path;
+ this.preceding = preceding;
+ this.remainder = remainder;
+ this.linkTarget = linkTarget;
+ }
+
+ /**
+ * Return a path with the link resolved with the target.
+ */
+ public Path getResolvedPath() throws IOException {
+ // If the path is absolute we cam throw out the preceding part and
+ // just append the remainder to the target, otherwise append each
+ // piece to resolve the link in path.
+ boolean noRemainder = (remainder == null || "".equals(remainder));
+ Path target = new Path(linkTarget);
+ if (target.isUriPathAbsolute()) {
+ return noRemainder ? target : new Path(target, remainder);
+ } else {
+ return noRemainder
+ ? new Path(preceding, target)
+ : new Path(new Path(preceding, linkTarget), remainder);
+ }
+ }
+
+ @Override
+ public String getMessage() {
+ String msg = super.getMessage();
+ if (msg != null) {
+ return msg;
+ }
+ String myMsg = "Unresolved path " + path;
+ try {
+ return getResolvedPath().toString();
+ } catch (IOException e) {
+ // Ignore
+ }
+ return myMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
new file mode 100644
index 0000000..c69986a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * The setting of replace-datanode-on-failure feature.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplaceDatanodeOnFailure {
+ /** The replacement policies */
+ public enum Policy {
+ /** The feature is disabled in the entire site. */
+ DISABLE(Condition.FALSE),
+ /** Never add a new datanode. */
+ NEVER(Condition.FALSE),
+ /** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */
+ DEFAULT(Condition.DEFAULT),
+ /** Always add a new datanode when an existing datanode is removed. */
+ ALWAYS(Condition.TRUE);
+
+ private final Condition condition;
+
+ private Policy(Condition condition) {
+ this.condition = condition;
+ }
+
+ Condition getCondition() {
+ return condition;
+ }
+ }
+
+ /** Datanode replacement condition */
+ private static interface Condition {
+ /** Return true unconditionally. */
+ static final Condition TRUE = new Condition() {
+ @Override
+ public boolean satisfy(short replication, DatanodeInfo[] existings,
+ int nExistings, boolean isAppend, boolean isHflushed) {
+ return true;
+ }
+ };
+
+ /** Return false unconditionally. */
+ static final Condition FALSE = new Condition() {
+ @Override
+ public boolean satisfy(short replication, DatanodeInfo[] existings,
+ int nExistings, boolean isAppend, boolean isHflushed) {
+ return false;
+ }
+ };
+
+ /**
+ * DEFAULT condition:
+ * Let r be the replication number.
+ * Let n be the number of existing datanodes.
+ * Add a new datanode only if r >= 3 and either
+ * (1) floor(r/2) >= n; or
+ * (2) r > n and the block is hflushed/appended.
+ */
+ static final Condition DEFAULT = new Condition() {
+ @Override
+ public boolean satisfy(final short replication,
+ final DatanodeInfo[] existings, final int n, final boolean isAppend,
+ final boolean isHflushed) {
+ if (replication < 3) {
+ return false;
+ } else {
+ if (n <= (replication/2)) {
+ return true;
+ } else {
+ return isAppend || isHflushed;
+ }
+ }
+ }
+ };
+
+ /** Is the condition satisfied? */
+ public boolean satisfy(short replication, DatanodeInfo[] existings,
+ int nExistings, boolean isAppend, boolean isHflushed);
+ }
+
+ private final Policy policy;
+ private final boolean bestEffort;
+
+ public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) {
+ this.policy = policy;
+ this.bestEffort = bestEffort;
+ }
+
+ /** Check if the feature is enabled. */
+ public void checkEnabled() {
+ if (policy == Policy.DISABLE) {
+ throw new UnsupportedOperationException(
+ "This feature is disabled. Please refer to "
+ + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY
+ + " configuration property.");
+ }
+ }
+
+ /**
+ * Best effort means that the client will try to replace the failed datanode
+ * (provided that the policy is satisfied), however, it will continue the
+ * write operation in case that the datanode replacement also fails.
+ *
+ * @return Suppose the datanode replacement fails.
+ * false: An exception should be thrown so that the write will fail.
+ * true : The write should be resumed with the remaining datandoes.
+ */
+ public boolean isBestEffort() {
+ return bestEffort;
+ }
+
+ /** Does it need a replacement according to the policy? */
+ public boolean satisfy(
+ final short replication, final DatanodeInfo[] existings,
+ final boolean isAppend, final boolean isHflushed) {
+ final int n = existings == null? 0: existings.length;
+ if (n == 0 || n >= replication) {
+ //don't need to add datanode for any policy.
+ return false;
+ } else {
+ return policy.getCondition().satisfy(
+ replication, existings, n, isAppend, isHflushed);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return policy.toString();
+ }
+
+ /** Get the setting from configuration. */
+ public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+ final Policy policy = getPolicy(conf);
+ final boolean bestEffort = conf.getBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT);
+
+ return new ReplaceDatanodeOnFailure(policy, bestEffort);
+ }
+
+ private static Policy getPolicy(final Configuration conf) {
+ final boolean enabled = conf.getBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT);
+ if (!enabled) {
+ return Policy.DISABLE;
+ }
+
+ final String policy = conf.get(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT);
+ for(int i = 1; i < Policy.values().length; i++) {
+ final Policy p = Policy.values()[i];
+ if (p.name().equalsIgnoreCase(policy)) {
+ return p;
+ }
+ }
+ throw new HadoopIllegalArgumentException("Illegal configuration value for "
+ + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY
+ + ": " + policy);
+ }
+
+ /** Write the setting to configuration. */
+ public static void write(final Policy policy,
+ final boolean bestEffort, final Configuration conf) {
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+ policy != Policy.DISABLE);
+ conf.set(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
+ policy.name());
+ conf.setBoolean(
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY,
+ bestEffort);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
new file mode 100644
index 0000000..b159d3a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+/**
+ * Exception indicating that DataNode does not have a replica
+ * that matches the target block.
+ */
+public class ReplicaNotFoundException extends IOException {
+ private static final long serialVersionUID = 1L;
+ public final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
+ public final static String UNFINALIZED_REPLICA =
+ "Cannot append to an unfinalized replica ";
+ public final static String UNFINALIZED_AND_NONRBW_REPLICA =
+ "Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
+ public final static String NON_EXISTENT_REPLICA =
+ "Cannot append to a non-existent replica ";
+ public final static String UNEXPECTED_GS_REPLICA =
+ "Cannot append to a replica with unexpected generation stamp ";
+
+ public ReplicaNotFoundException() {
+ super();
+ }
+
+ public ReplicaNotFoundException(ExtendedBlock b) {
+ super("Replica not found for " + b);
+ }
+
+ public ReplicaNotFoundException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
new file mode 100644
index 0000000..0bdd2a5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class RetryStartFileException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public RetryStartFileException() {
+ super("Preconditions for creating a file failed because of a " +
+ "transient error, retry create later.");
+ }
+
+ public RetryStartFileException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c3a1668..38af55b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -629,6 +629,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
via Colin P. McCabe)
+ HDFS-8053. Move DFSIn/OutputStream and related classes to
+ hadoop-hdfs-client. (Mingliang Liu via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 60029e0..c88c4c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -74,15 +74,6 @@
-
-
-
-
-
-
-
@@ -196,14 +187,4 @@
-
-
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
deleted file mode 100644
index 2200994..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/BlockStorageLocation.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Wrapper for {@link BlockLocation} that also adds {@link VolumeId} volume
- * location information for each replica.
- */
-@InterfaceStability.Unstable
-@InterfaceAudience.Public
-@Deprecated
-public class BlockStorageLocation extends BlockLocation {
-
- private final VolumeId[] volumeIds;
-
- public BlockStorageLocation(BlockLocation loc, VolumeId[] volumeIds)
- throws IOException {
- // Initialize with data from passed in BlockLocation
- super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), loc
- .getOffset(), loc.getLength(), loc.isCorrupt());
- this.volumeIds = volumeIds;
- }
-
- /**
- * Gets the list of {@link VolumeId} corresponding to the block's replicas.
- *
- * @return volumeIds list of VolumeId for the block's replicas
- */
- public VolumeId[] getVolumeIds() {
- return volumeIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
deleted file mode 100644
index 0ccacda..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-
-/**
- * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock},
- * allowing more detailed queries to the datanode about a block.
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HdfsBlockLocation extends BlockLocation {
-
- private final LocatedBlock block;
-
- public HdfsBlockLocation(BlockLocation loc, LocatedBlock block)
- throws IOException {
- // Initialize with data from passed in BlockLocation
- super(loc);
- this.block = block;
- }
-
- public LocatedBlock getLocatedBlock() {
- return block;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
deleted file mode 100644
index 6e9d3d7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Preconditions;
-
-/**
- * HDFS-specific volume identifier which implements {@link VolumeId}. Can be
- * used to differentiate between the data directories on a single datanode. This
- * identifier is only unique on a per-datanode basis.
- */
-@InterfaceStability.Unstable
-@InterfaceAudience.Public
-public class HdfsVolumeId implements VolumeId {
-
- private final byte[] id;
-
- public HdfsVolumeId(byte[] id) {
- Preconditions.checkNotNull(id, "id cannot be null");
- this.id = id;
- }
-
- @Override
- public int compareTo(VolumeId arg0) {
- if (arg0 == null) {
- return 1;
- }
- return hashCode() - arg0.hashCode();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(id).toHashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null || obj.getClass() != getClass()) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- HdfsVolumeId that = (HdfsVolumeId) obj;
- return new EqualsBuilder().append(this.id, that.id).isEquals();
- }
-
- @Override
- public String toString() {
- return StringUtils.byteToHexString(id);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
deleted file mode 100644
index e56e304..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/VolumeId.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Opaque interface that identifies a disk location. Subclasses
- * should implement {@link Comparable} and override both equals and hashCode.
- */
-@InterfaceStability.Unstable
-@InterfaceAudience.Public
-public interface VolumeId extends Comparable {
-
- @Override
- abstract public int compareTo(VolumeId arg0);
-
- @Override
- abstract public int hashCode();
-
- @Override
- abstract public boolean equals(Object obj);
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
deleted file mode 100644
index 7bba8a4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This exception is thrown when a read encounters a block that has no locations
- * associated with it.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class BlockMissingException extends IOException {
-
- private static final long serialVersionUID = 1L;
-
- private final String filename;
- private final long offset;
-
- /**
- * An exception that indicates that file was corrupted.
- * @param filename name of corrupted file
- * @param description a description of the corruption details
- */
- public BlockMissingException(String filename, String description, long offset) {
- super(description);
- this.filename = filename;
- this.offset = offset;
- }
-
- /**
- * Returns the name of the corrupted file.
- * @return name of corrupted file
- */
- public String getFile() {
- return filename;
- }
-
- /**
- * Returns the offset at which this file is corrupted
- * @return offset of corrupted file
- */
- public long getOffset() {
- return offset;
- }
-}