Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75B469C83 for ; Sun, 26 Feb 2012 23:32:53 +0000 (UTC) Received: (qmail 85949 invoked by uid 500); 26 Feb 2012 23:32:53 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 85910 invoked by uid 500); 26 Feb 2012 23:32:53 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 85901 invoked by uid 99); 26 Feb 2012 23:32:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Feb 2012 23:32:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Feb 2012 23:32:44 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7947E2388A6E; Sun, 26 Feb 2012 23:32:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1293964 [2/11] - in /hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/... Date: Sun, 26 Feb 2012 23:32:14 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120226233222.7947E2388A6E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolPB.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY) +@TokenInfo(BlockTokenSelector.class) +@ProtocolInfo(protocolName = + "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol", + protocolVersion = 1) +@InterfaceAudience.Private +public interface ClientDatanodeProtocolPB extends + ClientDatanodeProtocolService.BlockingInterface, VersionedProtocol { + + /** + * This method is defined to get the protocol signature using + * ProtocolSignatureWritable - suffix of 2 to the method name + * avoids conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.VersionedProtocol; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * Implementation for protobuf service that forwards requests + * received on {@link ClientDatanodeProtocolPB} to the + * {@link ClientDatanodeProtocol} server implementation. + */ +@InterfaceAudience.Private +public class ClientDatanodeProtocolServerSideTranslatorPB implements + ClientDatanodeProtocolPB { + private final static RefreshNamenodesResponseProto REFRESH_NAMENODE_RESP = + RefreshNamenodesResponseProto.newBuilder().build(); + private final static DeleteBlockPoolResponseProto DELETE_BLOCKPOOL_RESP = + DeleteBlockPoolResponseProto.newBuilder().build(); + + private final ClientDatanodeProtocol impl; + + public ClientDatanodeProtocolServerSideTranslatorPB( + ClientDatanodeProtocol impl) { + this.impl = impl; + } + + @Override + public GetReplicaVisibleLengthResponseProto getReplicaVisibleLength( + RpcController unused, GetReplicaVisibleLengthRequestProto request) + throws ServiceException { + long len; + try { + len = impl.getReplicaVisibleLength(PBHelper.convert(request.getBlock())); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetReplicaVisibleLengthResponseProto.newBuilder().setLength(len) + .build(); + } + + @Override + public RefreshNamenodesResponseProto refreshNamenodes( + RpcController unused, RefreshNamenodesRequestProto request) + throws ServiceException { + try { + impl.refreshNamenodes(); + } catch (IOException e) { + throw new ServiceException(e); + } + return REFRESH_NAMENODE_RESP; + } + + @Override + public DeleteBlockPoolResponseProto deleteBlockPool(RpcController unused, + DeleteBlockPoolRequestProto request) throws ServiceException { + try { + impl.deleteBlockPool(request.getBlockPool(), request.getForce()); + } catch (IOException e) { + throw new ServiceException(e); + } + return DELETE_BLOCKPOOL_RESP; + } + + @Override + public GetBlockLocalPathInfoResponseProto getBlockLocalPathInfo( + RpcController unused, GetBlockLocalPathInfoRequestProto request) + throws ServiceException { + BlockLocalPathInfo resp; + try { + resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken())); + } catch (IOException e) { + throw new ServiceException(e); + } + return GetBlockLocalPathInfoResponseProto.newBuilder() + .setBlock(PBHelper.convert(resp.getBlock())) + .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) + .build(); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(ClientDatanodeProtocolPB.class); + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client will never call this method. + * + * @see VersionedProtocol#getProtocolVersion + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientDatanodeProtocol} + */ + if (!protocol.equals(RPC.getProtocolName(ClientDatanodeProtocol.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(ClientDatanodeProtocol.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), + ClientDatanodeProtocolPB.class); + } + + + @Override + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientDatanodeProtocol} + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } +} Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolMetaInterface; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcClientUtil; +import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is the client side translator to translate the requests made on + * {@link ClientDatanodeProtocol} interfaces to the RPC server implementing + * {@link ClientDatanodeProtocolPB}. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientDatanodeProtocolTranslatorPB implements + ProtocolMetaInterface, ClientDatanodeProtocol, Closeable { + public static final Log LOG = LogFactory + .getLog(ClientDatanodeProtocolTranslatorPB.class); + + /** RpcController is not used and hence is set to null */ + private final static RpcController NULL_CONTROLLER = null; + private final ClientDatanodeProtocolPB rpcProxy; + private final static RefreshNamenodesRequestProto REFRESH_NAMENODES = + RefreshNamenodesRequestProto.newBuilder().build(); + + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout, LocatedBlock locatedBlock) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, + socketTimeout, locatedBlock); + } + + public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, + UserGroupInformation ticket, Configuration conf, SocketFactory factory) + throws IOException { + rpcProxy = createClientDatanodeProtocolProxy(addr, ticket, conf, factory, 0); + } + + /** + * Constructor. + * @param datanodeid Datanode to connect to. + * @param conf Configuration. + * @param socketTimeout Socket timeout to use. + * @throws IOException + */ + public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, + Configuration conf, int socketTimeout) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost() + + ":" + datanodeid.getIpcPort()); + rpcProxy = createClientDatanodeProtocolProxy(addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + DatanodeID datanodeid, Configuration conf, int socketTimeout, + LocatedBlock locatedBlock) throws IOException { + InetSocketAddress addr = NetUtils.createSocketAddr( + datanodeid.getHost() + ":" + datanodeid.getIpcPort()); + if (LOG.isDebugEnabled()) { + LOG.debug("ClientDatanodeProtocol addr=" + addr); + } + + // Since we're creating a new UserGroupInformation here, we know that no + // future RPC proxies will be able to re-use the same connection. And + // usages of this proxy tend to be one-off calls. + // + // This is a temporary fix: callers should really achieve this by using + // RPC.stopProxy() on the resulting object, but this is currently not + // working in trunk. See the discussion on HDFS-1965. + Configuration confWithNoIpcIdle = new Configuration(conf); + confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic + .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0); + + UserGroupInformation ticket = UserGroupInformation + .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString()); + ticket.addToken(locatedBlock.getBlockToken()); + return createClientDatanodeProtocolProxy(addr, ticket, confWithNoIpcIdle, + NetUtils.getDefaultSocketFactory(conf), socketTimeout); + } + + static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int socketTimeout) throws IOException { + RPC.setProtocolEngine(conf, ClientDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + return RPC.getProxy(ClientDatanodeProtocolPB.class, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), addr, ticket, + conf, factory, socketTimeout); + } + + @Override + public void close() { + RPC.stopProxy(rpcProxy); + } + + @Override + public long getProtocolVersion(String protocolName, long clientVersion) + throws IOException { + return rpcProxy.getProtocolVersion(protocolName, clientVersion); + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignatureWritable.convert(rpcProxy.getProtocolSignature2( + protocol, clientVersion, clientMethodsHash)); + } + + @Override + public long getReplicaVisibleLength(ExtendedBlock b) throws IOException { + GetReplicaVisibleLengthRequestProto req = GetReplicaVisibleLengthRequestProto + .newBuilder().setBlock(PBHelper.convert(b)).build(); + try { + return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void refreshNamenodes() throws IOException { + try { + rpcProxy.refreshNamenodes(NULL_CONTROLLER, REFRESH_NAMENODES); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void deleteBlockPool(String bpid, boolean force) throws IOException { + DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder() + .setBlockPool(bpid).setForce(force).build(); + try { + rpcProxy.deleteBlockPool(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, + Token token) throws IOException { + GetBlockLocalPathInfoRequestProto req = + GetBlockLocalPathInfoRequestProto.newBuilder() + .setBlock(PBHelper.convert(block)) + .setToken(PBHelper.convert(token)).build(); + GetBlockLocalPathInfoResponseProto resp; + try { + resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()), + resp.getLocalPath(), resp.getLocalMetaPath()); + } + + @Override + public boolean isMethodSupported(String methodName) throws IOException { + return RpcClientUtil.isMethodSupported(rpcProxy, + ClientDatanodeProtocolPB.class, RpcKind.RPC_PROTOCOL_BUFFER, + RPC.getProtocolVersion(ClientDatanodeProtocolPB.class), methodName); + } +} \ No newline at end of file Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolPB.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.hadoop.security.token.TokenInfo; + + +@InterfaceAudience.Private +@InterfaceStability.Stable +@KerberosInfo( + serverPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY) +@TokenInfo(DelegationTokenSelector.class) +@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, + protocolVersion = 1) +/** + * Protocol that a clients use to communicate with the NameNode. + * + * Note: This extends the protocolbuffer service based interface to + * add annotations required for security. + */ +public interface ClientNamenodeProtocolPB extends + ClientNamenodeProtocol.BlockingInterface, VersionedProtocol { + + /** + * This method is defined to get the protocol signature using + * the R23 protocol - hence we have added the suffix of 2 the method name + * to avoid conflict. + */ + public ProtocolSignatureWritable getProtocolSignature2(String protocol, + long clientVersion, int clientMethodsHash) throws IOException; +} Added: hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1293964&view=auto ============================================================================== --- hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (added) +++ hadoop/common/branches/branch-0.23-PB-merge/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Sun Feb 26 23:32:06 2012 @@ -0,0 +1,887 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolPB; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CancelDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetListingResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetPreferredBlockSizeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSaveResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2ResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewDelegationTokenRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewDelegationTokenResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPermissionResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable; +import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.RPC; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +/** + * This class is used on the server side. Calls come across the wire for the + * for protocol {@link ClientNamenodeProtocolPB}. + * This class translates the PB data types + * to the native data types used inside the NN as specified in the generic + * ClientProtocol. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class ClientNamenodeProtocolServerSideTranslatorPB implements + ClientNamenodeProtocolPB { + final private ClientProtocol server; + + /** + * Constructor + * + * @param server - the NN server + * @throws IOException + */ + public ClientNamenodeProtocolServerSideTranslatorPB(ClientProtocol server) + throws IOException { + this.server = server; + } + + /** + * The client side will redirect getProtocolSignature to + * getProtocolSignature2. + * + * However the RPC layer below on the Server side will call getProtocolVersion + * and possibly in the future getProtocolSignature. Hence we still implement + * it even though the end client's call will never reach here. + */ + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + if (!protocol.equals(RPC.getProtocolName( + ClientNamenodeProtocolPB.class))) { + throw new IOException("Namenode Serverside implements " + + RPC.getProtocolName(ClientNamenodeProtocolPB.class) + + ". The following requested protocol is unknown: " + protocol); + } + + return ProtocolSignature.getProtocolSignature(clientMethodsHash, + RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), + ClientNamenodeProtocolPB.class); + } + + @Override + public ProtocolSignatureWritable + getProtocolSignature2( + String protocol, long clientVersion, int clientMethodsHash) + throws IOException { + /** + * Don't forward this to the server. The protocol version and signature is + * that of {@link ClientNamenodeProtocol} + * + */ + return ProtocolSignatureWritable.convert( + this.getProtocolSignature(protocol, clientVersion, clientMethodsHash)); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return RPC.getProtocolVersion(InterDatanodeProtocolPB.class); + } + + @Override + public GetBlockLocationsResponseProto getBlockLocations( + RpcController controller, GetBlockLocationsRequestProto req) + throws ServiceException { + try { + LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(), + req.getLength()); + Builder builder = GetBlockLocationsResponseProto + .newBuilder(); + if (b != null) { + builder.setLocations(PBHelper.convert(b)).build(); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetServerDefaultsResponseProto getServerDefaults( + RpcController controller, GetServerDefaultsRequestProto req) + throws ServiceException { + try { + FsServerDefaults result = server.getServerDefaults(); + return GetServerDefaultsResponseProto.newBuilder() + .setServerDefaults(PBHelper.convert(result)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + + static final CreateResponseProto VOID_CREATE_RESPONSE = + CreateResponseProto.newBuilder().build(); + + @Override + public CreateResponseProto create(RpcController controller, + CreateRequestProto req) throws ServiceException { + try { + server.create(req.getSrc(), PBHelper.convert(req.getMasked()), + req.getClientName(), PBHelper.convert(req.getCreateFlag()), + req.getCreateParent(), (short) req.getReplication(), + req.getBlockSize()); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_CREATE_RESPONSE; + } + + static final AppendResponseProto NULL_APPEND_RESPONSE = + AppendResponseProto.newBuilder().build(); + + @Override + public AppendResponseProto append(RpcController controller, + AppendRequestProto req) throws ServiceException { + try { + LocatedBlock result = server.append(req.getSrc(), req.getClientName()); + if (result != null) { + return AppendResponseProto.newBuilder() + .setBlock(PBHelper.convert(result)).build(); + } + return NULL_APPEND_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SetReplicationResponseProto setReplication(RpcController controller, + SetReplicationRequestProto req) throws ServiceException { + try { + boolean result = + server.setReplication(req.getSrc(), (short) req.getReplication()); + return SetReplicationResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + + static final SetPermissionResponseProto VOID_SET_PERM_RESPONSE = + SetPermissionResponseProto.newBuilder().build(); + + @Override + public SetPermissionResponseProto setPermission(RpcController controller, + SetPermissionRequestProto req) throws ServiceException { + try { + server.setPermission(req.getSrc(), PBHelper.convert(req.getPermission())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_SET_PERM_RESPONSE; + } + + static final SetOwnerResponseProto VOID_SET_OWNER_RESPONSE = + SetOwnerResponseProto.newBuilder().build(); + + @Override + public SetOwnerResponseProto setOwner(RpcController controller, + SetOwnerRequestProto req) throws ServiceException { + try { + server.setOwner(req.getSrc(), + req.hasUsername() ? req.getUsername() : null, + req.hasGroupname() ? req.getGroupname() : null); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_SET_OWNER_RESPONSE; + } + + static final AbandonBlockResponseProto VOID_ADD_BLOCK_RESPONSE = + AbandonBlockResponseProto.newBuilder().build(); + + @Override + public AbandonBlockResponseProto abandonBlock(RpcController controller, + AbandonBlockRequestProto req) throws ServiceException { + try { + server.abandonBlock(PBHelper.convert(req.getB()), req.getSrc(), + req.getHolder()); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_ADD_BLOCK_RESPONSE; + } + + @Override + public AddBlockResponseProto addBlock(RpcController controller, + AddBlockRequestProto req) throws ServiceException { + + try { + List excl = req.getExcludeNodesList(); + LocatedBlock result = server.addBlock(req.getSrc(), req.getClientName(), + req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, + (excl == null || + excl.size() == 0) ? null : + PBHelper.convert(excl.toArray(new DatanodeInfoProto[excl.size()]))); + return AddBlockResponseProto.newBuilder().setBlock( + PBHelper.convert(result)).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetAdditionalDatanodeResponseProto getAdditionalDatanode( + RpcController controller, GetAdditionalDatanodeRequestProto req) + throws ServiceException { + try { + List existingList = req.getExistingsList(); + List excludesList = req.getExcludesList(); + LocatedBlock result = server.getAdditionalDatanode( + req.getSrc(), PBHelper.convert(req.getBlk()), + PBHelper.convert(existingList.toArray( + new DatanodeInfoProto[existingList.size()])), + PBHelper.convert(excludesList.toArray( + new DatanodeInfoProto[excludesList.size()])), + req.getNumAdditionalNodes(), req.getClientName()); + return GetAdditionalDatanodeResponseProto.newBuilder().setBlock( + PBHelper.convert(result)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public CompleteResponseProto complete(RpcController controller, + CompleteRequestProto req) throws ServiceException { + try { + boolean result = + server.complete(req.getSrc(), req.getClientName(), + req.hasLast() ? PBHelper.convert(req.getLast()) : null); + return CompleteResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final ReportBadBlocksResponseProto VOID_REP_BAD_BLOCK_RESPONSE = + ReportBadBlocksResponseProto.newBuilder().build(); + + @Override + public ReportBadBlocksResponseProto reportBadBlocks(RpcController controller, + ReportBadBlocksRequestProto req) throws ServiceException { + try { + List bl = req.getBlocksList(); + server.reportBadBlocks(PBHelper.convertLocatedBlock( + bl.toArray(new LocatedBlockProto[bl.size()]))); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REP_BAD_BLOCK_RESPONSE; + } + + static final ConcatResponseProto VOID_CONCAT_RESPONSE = + ConcatResponseProto.newBuilder().build(); + + @Override + public ConcatResponseProto concat(RpcController controller, + ConcatRequestProto req) throws ServiceException { + try { + List srcs = req.getSrcsList(); + server.concat(req.getTrg(), srcs.toArray(new String[srcs.size()])); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_CONCAT_RESPONSE; + } + + @Override + public RenameResponseProto rename(RpcController controller, + RenameRequestProto req) throws ServiceException { + try { + boolean result = server.rename(req.getSrc(), req.getDst()); + return RenameResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final Rename2ResponseProto VOID_RENAME2_RESPONSE = + Rename2ResponseProto.newBuilder().build(); + + @Override + public Rename2ResponseProto rename2(RpcController controller, + Rename2RequestProto req) throws ServiceException { + + try { + server.rename2(req.getSrc(), req.getDst(), + req.getOverwriteDest() ? Rename.OVERWRITE : Rename.NONE); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_RENAME2_RESPONSE; + } + + @Override + public DeleteResponseProto delete(RpcController controller, + DeleteRequestProto req) throws ServiceException { + try { + boolean result = server.delete(req.getSrc(), req.getRecursive()); + return DeleteResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public MkdirsResponseProto mkdirs(RpcController controller, + MkdirsRequestProto req) throws ServiceException { + try { + boolean result = server.mkdirs(req.getSrc(), + PBHelper.convert(req.getMasked()), req.getCreateParent()); + return MkdirsResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final GetListingResponseProto NULL_GETLISTING_RESPONSE = + GetListingResponseProto.newBuilder().build(); + @Override + public GetListingResponseProto getListing(RpcController controller, + GetListingRequestProto req) throws ServiceException { + try { + DirectoryListing result = server.getListing( + req.getSrc(), req.getStartAfter().toByteArray(), + req.getNeedLocation()); + if (result !=null) { + return GetListingResponseProto.newBuilder().setDirList( + PBHelper.convert(result)).build(); + } else { + return NULL_GETLISTING_RESPONSE; + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final RenewLeaseResponseProto VOID_RENEWLEASE_RESPONSE = + RenewLeaseResponseProto.newBuilder().build(); + + @Override + public RenewLeaseResponseProto renewLease(RpcController controller, + RenewLeaseRequestProto req) throws ServiceException { + try { + server.renewLease(req.getClientName()); + return VOID_RENEWLEASE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RecoverLeaseResponseProto recoverLease(RpcController controller, + RecoverLeaseRequestProto req) throws ServiceException { + try { + boolean result = server.recoverLease(req.getSrc(), req.getClientName()); + return RecoverLeaseResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RestoreFailedStorageResponseProto restoreFailedStorage( + RpcController controller, RestoreFailedStorageRequestProto req) + throws ServiceException { + try { + boolean result = server.restoreFailedStorage(req.getArg()); + return RestoreFailedStorageResponseProto.newBuilder().setResult(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetFsStatsResponseProto getFsStats(RpcController controller, + GetFsStatusRequestProto req) throws ServiceException { + try { + return PBHelper.convert(server.getStats()); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetDatanodeReportResponseProto getDatanodeReport( + RpcController controller, GetDatanodeReportRequestProto req) + throws ServiceException { + try { + DatanodeInfoProto[] result = PBHelper.convert(server + .getDatanodeReport(PBHelper.convert(req.getType()))); + return GetDatanodeReportResponseProto.newBuilder() + .addAllDi(Arrays.asList(result)).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetPreferredBlockSizeResponseProto getPreferredBlockSize( + RpcController controller, GetPreferredBlockSizeRequestProto req) + throws ServiceException { + try { + long result = server.getPreferredBlockSize(req.getFilename()); + return GetPreferredBlockSizeResponseProto.newBuilder().setBsize(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SetSafeModeResponseProto setSafeMode(RpcController controller, + SetSafeModeRequestProto req) throws ServiceException { + try { + boolean result = server.setSafeMode(PBHelper.convert(req.getAction())); + return SetSafeModeResponseProto.newBuilder().setResult(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SaveNamespaceResponseProto VOID_SAVENAMESPACE_RESPONSE = + SaveNamespaceResponseProto.newBuilder().build(); + + @Override + public SaveNamespaceResponseProto saveNamespace(RpcController controller, + SaveNamespaceRequestProto req) throws ServiceException { + try { + server.saveNamespace(); + return VOID_SAVENAMESPACE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = + RefreshNodesResponseProto.newBuilder().build(); + + @Override + public RefreshNodesResponseProto refreshNodes(RpcController controller, + RefreshNodesRequestProto req) throws ServiceException { + try { + server.refreshNodes(); + return VOID_REFRESHNODES_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + static final FinalizeUpgradeResponseProto VOID_FINALIZEUPGRADE_RESPONSE = + FinalizeUpgradeResponseProto.newBuilder().build(); + + @Override + public FinalizeUpgradeResponseProto finalizeUpgrade(RpcController controller, + FinalizeUpgradeRequestProto req) throws ServiceException { + try { + server.finalizeUpgrade(); + return VOID_FINALIZEUPGRADE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public DistributedUpgradeProgressResponseProto distributedUpgradeProgress( + RpcController controller, DistributedUpgradeProgressRequestProto req) + throws ServiceException { + try { + UpgradeStatusReport result = server.distributedUpgradeProgress(PBHelper + .convert(req.getAction())); + DistributedUpgradeProgressResponseProto.Builder builder = + DistributedUpgradeProgressResponseProto.newBuilder(); + if (result != null) { + builder.setReport(PBHelper.convert(result)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListCorruptFileBlocksResponseProto listCorruptFileBlocks( + RpcController controller, ListCorruptFileBlocksRequestProto req) + throws ServiceException { + try { + CorruptFileBlocks result = server.listCorruptFileBlocks( + req.getPath(), req.hasCookie() ? req.getCookie(): null); + return ListCorruptFileBlocksResponseProto.newBuilder() + .setCorrupt(PBHelper.convert(result)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final MetaSaveResponseProto VOID_METASAVE_RESPONSE = + MetaSaveResponseProto.newBuilder().build(); + + @Override + public MetaSaveResponseProto metaSave(RpcController controller, + MetaSaveRequestProto req) throws ServiceException { + try { + server.metaSave(req.getFilename()); + return VOID_METASAVE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + + } + + static final GetFileInfoResponseProto NULL_GETFILEINFO_RESPONSE = + GetFileInfoResponseProto.newBuilder().build(); + @Override + public GetFileInfoResponseProto getFileInfo(RpcController controller, + GetFileInfoRequestProto req) throws ServiceException { + try { + HdfsFileStatus result = server.getFileInfo(req.getSrc()); + + if (result != null) { + return GetFileInfoResponseProto.newBuilder().setFs( + PBHelper.convert(result)).build(); + } + return NULL_GETFILEINFO_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final GetFileLinkInfoResponseProto NULL_GETFILELINKINFO_RESPONSE = + GetFileLinkInfoResponseProto.newBuilder().build(); + @Override + public GetFileLinkInfoResponseProto getFileLinkInfo(RpcController controller, + GetFileLinkInfoRequestProto req) throws ServiceException { + try { + HdfsFileStatus result = server.getFileLinkInfo(req.getSrc()); + if (result != null) { + System.out.println("got non null result for getFileLinkInfo for " + req.getSrc()); + return GetFileLinkInfoResponseProto.newBuilder().setFs( + PBHelper.convert(result)).build(); + } else { + System.out.println("got null result for getFileLinkInfo for " + req.getSrc()); + return NULL_GETFILELINKINFO_RESPONSE; + } + + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetContentSummaryResponseProto getContentSummary( + RpcController controller, GetContentSummaryRequestProto req) + throws ServiceException { + try { + ContentSummary result = server.getContentSummary(req.getPath()); + return GetContentSummaryResponseProto.newBuilder() + .setSummary(PBHelper.convert(result)).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetQuotaResponseProto VOID_SETQUOTA_RESPONSE = + SetQuotaResponseProto.newBuilder().build(); + + @Override + public SetQuotaResponseProto setQuota(RpcController controller, + SetQuotaRequestProto req) throws ServiceException { + try { + server.setQuota(req.getPath(), req.getNamespaceQuota(), + req.getDiskspaceQuota()); + return VOID_SETQUOTA_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final FsyncResponseProto VOID_FSYNC_RESPONSE = + FsyncResponseProto.newBuilder().build(); + + @Override + public FsyncResponseProto fsync(RpcController controller, + FsyncRequestProto req) throws ServiceException { + try { + server.fsync(req.getSrc(), req.getClient()); + return VOID_FSYNC_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetTimesResponseProto VOID_SETTIMES_RESPONSE = + SetTimesResponseProto.newBuilder().build(); + + @Override + public SetTimesResponseProto setTimes(RpcController controller, + SetTimesRequestProto req) throws ServiceException { + try { + server.setTimes(req.getSrc(), req.getMtime(), req.getAtime()); + return VOID_SETTIMES_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final CreateSymlinkResponseProto VOID_CREATESYMLINK_RESPONSE = + CreateSymlinkResponseProto.newBuilder().build(); + + @Override + public CreateSymlinkResponseProto createSymlink(RpcController controller, + CreateSymlinkRequestProto req) throws ServiceException { + try { + server.createSymlink(req.getTarget(), req.getLink(), + PBHelper.convert(req.getDirPerm()), req.getCreateParent()); + return VOID_CREATESYMLINK_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetLinkTargetResponseProto getLinkTarget(RpcController controller, + GetLinkTargetRequestProto req) throws ServiceException { + try { + String result = server.getLinkTarget(req.getPath()); + return GetLinkTargetResponseProto.newBuilder().setTargetPath(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public UpdateBlockForPipelineResponseProto updateBlockForPipeline( + RpcController controller, UpdateBlockForPipelineRequestProto req) + throws ServiceException { + try { + LocatedBlockProto result = PBHelper.convert(server + .updateBlockForPipeline(PBHelper.convert(req.getBlock()), + req.getClientName())); + return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final UpdatePipelineResponseProto VOID_UPDATEPIPELINE_RESPONSE = + UpdatePipelineResponseProto.newBuilder().build(); + + @Override + public UpdatePipelineResponseProto updatePipeline(RpcController controller, + UpdatePipelineRequestProto req) throws ServiceException { + try { + List newNodes = req.getNewNodesList(); + server + .updatePipeline(req.getClientName(), PBHelper.convert(req + .getOldBlock()), PBHelper.convert(req.getNewBlock()), PBHelper + .convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()]))); + return VOID_UPDATEPIPELINE_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetDelegationTokenResponseProto getDelegationToken( + RpcController controller, GetDelegationTokenRequestProto req) + throws ServiceException { + try { + BlockTokenIdentifierProto result = PBHelper.convert(server + .getDelegationToken(new Text(req.getRenewer()))); + return GetDelegationTokenResponseProto.newBuilder().setToken(result) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RenewDelegationTokenResponseProto renewDelegationToken( + RpcController controller, RenewDelegationTokenRequestProto req) + throws ServiceException { + try { + long result = server.renewDelegationToken(PBHelper + .convertDelegationToken(req.getToken())); + return RenewDelegationTokenResponseProto.newBuilder() + .setNewExireTime(result).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final CancelDelegationTokenResponseProto + VOID_CANCELDELEGATIONTOKEN_RESPONSE = + CancelDelegationTokenResponseProto.newBuilder().build(); + + @Override + public CancelDelegationTokenResponseProto cancelDelegationToken( + RpcController controller, CancelDelegationTokenRequestProto req) + throws ServiceException { + try { + server.cancelDelegationToken(PBHelper.convertDelegationToken(req + .getToken())); + return VOID_CANCELDELEGATIONTOKEN_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } + + static final SetBalancerBandwidthResponseProto + VOID_SETBALANCERBANDWIDTH_RESPONSE = + SetBalancerBandwidthResponseProto.newBuilder().build(); + + @Override + public SetBalancerBandwidthResponseProto setBalancerBandwidth( + RpcController controller, SetBalancerBandwidthRequestProto req) + throws ServiceException { + try { + server.setBalancerBandwidth(req.getBandwidth()); + return VOID_SETBALANCERBANDWIDTH_RESPONSE; + } catch (IOException e) { + throw new ServiceException(e); + } + } +}