Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7F8F019ED5 for ; Fri, 29 Apr 2016 20:20:11 +0000 (UTC) Received: (qmail 77955 invoked by uid 500); 29 Apr 2016 20:20:00 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 77104 invoked by uid 500); 29 Apr 2016 20:19:59 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 74376 invoked by uid 99); 29 Apr 2016 20:19:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 20:19:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ABAABEA925; Fri, 29 Apr 2016 20:19:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Fri, 29 Apr 2016 20:20:18 -0000 Message-Id: <9ff34c43e10b477580697afd88c64cee@git.apache.org> In-Reply-To: <7bf7ef6c73cb43eba069a697ad6f21ed@git.apache.org> References: <7bf7ef6c73cb43eba069a697ad6f21ed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/50] [abbrv] hadoop git commit: HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc94810d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc94810d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc94810d Branch: refs/heads/HDFS-7240 Commit: fc94810d3f537e51e826fc21ade7867892b9d8dc Parents: 959a28d Author: Tsz-Wo Nicholas Sze Authored: Tue Apr 26 17:10:13 2016 -0700 Committer: Tsz-Wo Nicholas Sze Committed: Tue Apr 26 17:10:13 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/FileSystem.java | 1 - .../main/java/org/apache/hadoop/ipc/Client.java | 11 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 34 ++- .../org/apache/hadoop/ipc/TestAsyncIPC.java | 2 +- .../hadoop/hdfs/AsyncDistributedFileSystem.java | 110 ++++++++ .../hadoop/hdfs/DistributedFileSystem.java | 21 +- .../ClientNamenodeProtocolTranslatorPB.java | 45 +++- .../apache/hadoop/hdfs/TestAsyncDFSRename.java | 258 +++++++++++++++++++ 8 files changed, 462 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 58b37c2..b376763 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1244,7 +1244,6 @@ public abstract class FileSystem extends Configured implements Closeable { /** * Renames Path src to Path dst *
    - *
  • Fails if src is a file and dst is a directory. *
  • Fails if src is a directory and dst is a file. *
  • Fails if the parent of dst does not exist or is a file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index f206861..d59aeb89 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -119,7 +119,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); - private static final ThreadLocal> returnValue = new ThreadLocal<>(); + private static final ThreadLocal> + RETURN_RPC_RESPONSE = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = new ThreadLocal() { @Override @@ -130,8 +131,8 @@ public class Client implements AutoCloseable { @SuppressWarnings("unchecked") @Unstable - public static Future getReturnValue() { - return (Future) returnValue.get(); + public static Future getReturnRpcResponse() { + return (Future) RETURN_RPC_RESPONSE.get(); } /** Set call id and retry count for the next call. */ @@ -1396,7 +1397,7 @@ public class Client implements AutoCloseable { } }; - returnValue.set(returnFuture); + RETURN_RPC_RESPONSE.set(returnFuture); return null; } else { return getRpcResponse(call, connection); @@ -1410,7 +1411,7 @@ public class Client implements AutoCloseable { * synchronous mode. */ @Unstable - static boolean isAsynchronousMode() { + public static boolean isAsynchronousMode() { return asynchronousMode.get(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 071e2e8..8fcdb78 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -26,7 +26,9 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.SocketFactory; @@ -35,6 +37,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputOutputStream; import org.apache.hadoop.io.Writable; @@ -67,7 +70,9 @@ import com.google.protobuf.TextFormat; @InterfaceStability.Evolving public class ProtobufRpcEngine implements RpcEngine { public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class); - + private static final ThreadLocal> + RETURN_MESSAGE_CALLBACK = new ThreadLocal<>(); + static { // Register the rpcRequest deserializer for WritableRpcEngine org.apache.hadoop.ipc.Server.registerProtocolEngine( RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, @@ -76,6 +81,12 @@ public class ProtobufRpcEngine implements RpcEngine { private static final ClientCache CLIENTS = new ClientCache(); + @SuppressWarnings("unchecked") + @Unstable + public static Callable getReturnMessageCallback() { + return (Callable) RETURN_MESSAGE_CALLBACK.get(); + } + public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { @@ -189,7 +200,7 @@ public class ProtobufRpcEngine implements RpcEngine { * the server. */ @Override - public Object invoke(Object proxy, Method method, Object[] args) + public Object invoke(Object proxy, final Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { @@ -251,6 +262,23 @@ public class ProtobufRpcEngine implements RpcEngine { LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); } + if (Client.isAsynchronousMode()) { + final Future frrw = Client.getReturnRpcResponse(); + Callable callback = new Callable() { + @Override + public Message call() throws Exception { + return getReturnMessage(method, frrw.get()); + } + }; + RETURN_MESSAGE_CALLBACK.set(callback); + return null; + } else { + return getReturnMessage(method, val); + } + } + + private Message getReturnMessage(final Method method, + final RpcResponseWrapper rrw) throws ServiceException { Message prototype = null; try { prototype = getReturnProtoType(method); @@ -260,7 +288,7 @@ public class ProtobufRpcEngine implements RpcEngine { Message returnMessage; try { returnMessage = prototype.newBuilderForType() - .mergeFrom(val.theResponseRead).build(); + .mergeFrom(rrw.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index de4395e..6cf75c7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -84,7 +84,7 @@ public class TestAsyncIPC { try { final long param = TestIPC.RANDOM.nextLong(); TestIPC.call(client, param, server, conf); - Future returnFuture = Client.getReturnValue(); + Future returnFuture = Client.getReturnRpcResponse(); returnFutures.put(i, returnFuture); expectedValues.put(i, param); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java new file mode 100644 index 0000000..37899aa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB; +import org.apache.hadoop.ipc.Client; + +import com.google.common.util.concurrent.AbstractFuture; + +/**************************************************************** + * Implementation of the asynchronous distributed file system. + * This instance of this class is the way end-user code interacts + * with a Hadoop DistributedFileSystem in an asynchronous manner. + * + *****************************************************************/ +@Unstable +public class AsyncDistributedFileSystem { + + private final DistributedFileSystem dfs; + + AsyncDistributedFileSystem(final DistributedFileSystem dfs) { + this.dfs = dfs; + } + + static Future getReturnValue() { + final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB + .getReturnValueCallback(); + Future returnFuture = new AbstractFuture() { + public T get() throws InterruptedException, ExecutionException { + try { + set(returnValueCallback.call()); + } catch (Exception e) { + setException(e); + } + return super.get(); + } + }; + return returnFuture; + } + + /** + * Renames Path src to Path dst + *
      + *
    • Fails if src is a file and dst is a directory. + *
    • Fails if src is a directory and dst is a file. + *
    • Fails if the parent of dst does not exist or is a file. + *
    + *

    + * If OVERWRITE option is not passed as an argument, rename fails if the dst + * already exists. + *

    + * If OVERWRITE option is passed as an argument, rename overwrites the dst if + * it is a file or an empty directory. Rename fails if dst is a non-empty + * directory. + *

    + * Note that atomicity of rename is dependent on the file system + * implementation. Please refer to the file system documentation for details. + * This default implementation is non atomic. + * + * @param src + * path to be renamed + * @param dst + * new path after rename + * @throws IOException + * on failure + * @return an instance of Future, #get of which is invoked to wait for + * asynchronous call being finished. + */ + public Future rename(Path src, Path dst, + final Options.Rename... options) throws IOException { + dfs.getFsStatistics().incrementWriteOps(1); + + final Path absSrc = dfs.fixRelativePart(src); + final Path absDst = dfs.fixRelativePart(dst); + + final boolean isAsync = Client.isAsynchronousMode(); + Client.setAsynchronousMode(true); + try { + dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst), + options); + return getReturnValue(); + } finally { + Client.setAsynchronousMode(isAsync); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 188c1da..a3a8ba0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStoragePolicySpi; @@ -190,7 +191,7 @@ public class DistributedFileSystem extends FileSystem { * @return path component of {file} * @throws IllegalArgumentException if URI does not belong to this DFS */ - private String getPathName(Path file) { + String getPathName(Path file) { checkPath(file); String result = file.toUri().getPath(); if (!DFSUtilClient.isValidName(result)) { @@ -2428,4 +2429,22 @@ public class DistributedFileSystem extends FileSystem { } return ret; } + + private final AsyncDistributedFileSystem adfs = + new AsyncDistributedFileSystem(this); + + /** @return an {@link AsyncDistributedFileSystem} object. */ + @Unstable + public AsyncDistributedFileSystem getAsyncDistributedFileSystem() { + return adfs; + } + + @Override + protected Path fixRelativePart(Path p) { + return super.fixRelativePart(p); + } + + Statistics getFsStatistics() { + return statistics; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index adcc507..e325597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -24,11 +24,14 @@ import java.util.EnumSet; import java.util.List; import com.google.common.collect.Lists; +import java.util.concurrent.Callable; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; @@ -134,7 +138,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Recove import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto; @@ -152,13 +155,15 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.*; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto; @@ -176,8 +181,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; @@ -189,12 +195,9 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; +import com.google.protobuf.Message; import com.google.protobuf.ServiceException; -import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; -import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos - .EncryptionZoneProto; - /** * This class forwards NN's ClientProtocol calls as RPC calls to the NN server * while translating from the parameter types used in ClientProtocol to the @@ -205,6 +208,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos public class ClientNamenodeProtocolTranslatorPB implements ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator { final private ClientNamenodeProtocolPB rpcProxy; + private static final ThreadLocal> + RETURN_VALUE_CALLBACK = new ThreadLocal<>(); static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST = GetServerDefaultsRequestProto.newBuilder().build(); @@ -238,6 +243,12 @@ public class ClientNamenodeProtocolTranslatorPB implements rpcProxy = proxy; } + @SuppressWarnings("unchecked") + @Unstable + public static Callable getReturnValueCallback() { + return (Callable) RETURN_VALUE_CALLBACK.get(); + } + @Override public void close() { RPC.stopProxy(rpcProxy); @@ -469,6 +480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements RenameRequestProto req = RenameRequestProto.newBuilder() .setSrc(src) .setDst(dst).build(); + try { return rpcProxy.rename(null, req).getResult(); } catch (ServiceException e) { @@ -493,7 +505,22 @@ public class ClientNamenodeProtocolTranslatorPB implements setDst(dst).setOverwriteDest(overwrite). build(); try { - rpcProxy.rename2(null, req); + if (Client.isAsynchronousMode()) { + rpcProxy.rename2(null, req); + + final Callable returnMessageCallback = ProtobufRpcEngine + .getReturnMessageCallback(); + Callable callBack = new Callable() { + @Override + public Void call() throws Exception { + returnMessageCallback.call(); + return null; + } + }; + RETURN_VALUE_CALLBACK.set(callBack); + } else { + rpcProxy.rename2(null, req); + } } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc94810d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java new file mode 100644 index 0000000..9322e1a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestAsyncDFSRename { + final Path asyncRenameDir = new Path("/test/async_rename/"); + public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); + final private static Configuration CONF = new HdfsConfiguration(); + + final private static String GROUP1_NAME = "group1"; + final private static String GROUP2_NAME = "group2"; + final private static String USER1_NAME = "user1"; + private static final UserGroupInformation USER1; + + private MiniDFSCluster gCluster; + + static { + // explicitly turn on permission checking + CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + + // create fake mapping for the groups + Map u2g_map = new HashMap(1); + u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME }); + DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map); + + // Initiate all four users + USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] { + GROUP1_NAME, GROUP2_NAME }); + } + + @Before + public void setUp() throws IOException { + gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build(); + gCluster.waitActive(); + } + + @After + public void tearDown() throws IOException { + if (gCluster != null) { + gCluster.shutdown(); + gCluster = null; + } + } + + static int countLease(MiniDFSCluster cluster) { + return TestDFSRename.countLease(cluster); + } + + void list(DistributedFileSystem dfs, String name) throws IOException { + FileSystem.LOG.info("\n\n" + name); + for (FileStatus s : dfs.listStatus(asyncRenameDir)) { + FileSystem.LOG.info("" + s.getPath()); + } + } + + static void createFile(DistributedFileSystem dfs, Path f) throws IOException { + DataOutputStream a_out = dfs.create(f); + a_out.writeBytes("something"); + a_out.close(); + } + + /** + * Check the blocks of dst file are cleaned after rename with overwrite + * Restart NN to check the rename successfully + */ + @Test + public void testAsyncRenameWithOverwrite() throws Exception { + final short replFactor = 2; + final long blockSize = 512; + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + replFactor).build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + + try { + + long fileLen = blockSize * 3; + String src = "/foo/src"; + String dst = "/foo/dst"; + String src2 = "/foo/src2"; + String dst2 = "/foo/dst2"; + Path srcPath = new Path(src); + Path dstPath = new Path(dst); + Path srcPath2 = new Path(src2); + Path dstPath2 = new Path(dst2); + + DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1); + + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), dst, 0, fileLen); + LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), dst2, 0, fileLen); + BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode()) + .getBlockManager(); + assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) != null); + assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) != null); + + Future retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE); + Future retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE); + retVal1.get(); + retVal2.get(); + + assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) == null); + assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) == null); + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + assertFalse(dfs.exists(srcPath)); + assertTrue(dfs.exists(dstPath)); + assertFalse(dfs.exists(srcPath2)); + assertTrue(dfs.exists(dstPath2)); + } finally { + if (dfs != null) { + dfs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testConcurrentAsyncRenameWithOverwrite() throws Exception { + final short replFactor = 2; + final long blockSize = 512; + final Path renameDir = new Path( + "/test/concurrent_reanme_with_overwrite_dir/"); + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + int count = 1000; + + try { + long fileLen = blockSize * 3; + assertTrue(dfs.mkdirs(renameDir)); + + Map> returnFutures = new HashMap>(); + + // concurrently invoking many rename + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFutures.put(i, returnFuture); + } + + // wait for completing the calls + for (int i = 0; i < count; i++) { + returnFutures.get(i).get(); + } + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + + // very the src dir should not exist, dst should + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + assertFalse(dfs.exists(src)); + assertTrue(dfs.exists(dst)); + } + } finally { + dfs.delete(renameDir, true); + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testAsyncRenameWithException() throws Exception { + FileSystem rootFs = FileSystem.get(CONF); + final Path renameDir = new Path("/test/async_rename_exception/"); + final Path src = new Path(renameDir, "src"); + final Path dst = new Path(renameDir, "dst"); + rootFs.mkdirs(src); + + AsyncDistributedFileSystem adfs = USER1 + .doAs(new PrivilegedExceptionAction() { + @Override + public AsyncDistributedFileSystem run() throws Exception { + return gCluster.getFileSystem().getAsyncDistributedFileSystem(); + } + }); + + try { + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFuture.get(); + } catch (ExecutionException e) { + checkPermissionDenied(e, src); + } + } + + private void checkPermissionDenied(final Exception e, final Path dir) { + assertTrue(e.getCause() instanceof ExecutionException); + assertTrue("Permission denied messages must carry AccessControlException", + e.getMessage().contains("AccessControlException")); + assertTrue("Permission denied messages must carry the username", e + .getMessage().contains(USER1_NAME)); + assertTrue("Permission denied messages must carry the path parent", e + .getMessage().contains(dir.getParent().toUri().getPath())); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org