Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CAC7C768F for ; Thu, 6 Oct 2011 21:58:56 +0000 (UTC) Received: (qmail 55583 invoked by uid 500); 6 Oct 2011 21:58:56 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 55558 invoked by uid 500); 6 Oct 2011 21:58:56 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 55550 invoked by uid 99); 6 Oct 2011 21:58:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2011 21:58:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2011 21:58:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 949D82388A74; Thu, 6 Oct 2011 21:58:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1179877 [3/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolProtocolBuffers/ src/main/java/or... Date: Thu, 06 Oct 2011 21:58:23 -0000 To: hdfs-commits@hadoop.apache.org From: sradia@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111006215825.949D82388A74@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlockWritable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlockWritable.java?rev=1179877&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlockWritable.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlockWritable.java Thu Oct 6 21:58:22 2011 @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.io.*; + +import java.io.*; +import java.util.ArrayList; +import java.util.List; + +/**************************************************** + * A LocatedBlock is a pair of Block, DatanodeInfo[] + * objects. It tells where to find a Block. + * + ****************************************************/ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class LocatedBlockWritable implements Writable { + + static { // register a ctor + WritableFactories.setFactory + (LocatedBlockWritable.class, + new WritableFactory() { + public Writable newInstance() { return new LocatedBlockWritable(); } + }); + } + + private ExtendedBlockWritable b; + private long offset; // offset of the first byte of the block in the file + private DatanodeInfoWritable[] locs; + // corrupt flag is true if all of the replicas of a block are corrupt. + // else false. If block has few corrupt replicas, they are filtered and + // their locations are not part of this object + private boolean corrupt; + private TokenWritable blockToken = new TokenWritable(); + + + static public org.apache.hadoop.hdfs.protocol.LocatedBlock + convertLocatedBlock(LocatedBlockWritable lb) { + if (lb == null) return null; + org.apache.hadoop.hdfs.protocol.LocatedBlock result = + new org.apache.hadoop.hdfs.protocol.LocatedBlock(ExtendedBlockWritable. + convertExtendedBlock(lb.getBlock()), + DatanodeInfoWritable.convertDatanodeInfo( + lb.getLocations()), lb.getStartOffset(), lb.isCorrupt()); + + // Fill in the token + TokenWritable tok = lb.getBlockToken(); + result.setBlockToken( + new org.apache.hadoop.security.token.Token( + tok.getIdentifier(), tok.getPassword(), tok.getKind(), + tok.getService())); + return result; + } + + public static LocatedBlockWritable + convertLocatedBlock(org.apache.hadoop.hdfs.protocol.LocatedBlock lb) { + if (lb == null) return null; + LocatedBlockWritable result = + new LocatedBlockWritable(ExtendedBlockWritable.convertExtendedBlock(lb.getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb.getLocations()), + lb.getStartOffset(), lb.isCorrupt()); + + // Fill in the token + org.apache.hadoop.security.token.Token tok = + lb.getBlockToken(); + result.setBlockToken(new TokenWritable(tok.getIdentifier(), tok.getPassword(), + tok.getKind(), tok.getService())); + return result; + } + + static public LocatedBlockWritable[] + convertLocatedBlock(org.apache.hadoop.hdfs.protocol.LocatedBlock[] lb) { + if (lb == null) return null; + final int len = lb.length; + LocatedBlockWritable[] result = new LocatedBlockWritable[len]; + for (int i = 0; i < len; ++i) { + result[i] = new LocatedBlockWritable( + ExtendedBlockWritable.convertExtendedBlock(lb[i].getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb[i].getLocations()), + lb[i].getStartOffset(), lb[i].isCorrupt()); + } + return result; + } + + static public org.apache.hadoop.hdfs.protocol.LocatedBlock[] + convertLocatedBlock(LocatedBlockWritable[] lb) { + if (lb == null) return null; + final int len = lb.length; + org.apache.hadoop.hdfs.protocol.LocatedBlock[] result = + new org.apache.hadoop.hdfs.protocol.LocatedBlock[len]; + for (int i = 0; i < len; ++i) { + result[i] = new org.apache.hadoop.hdfs.protocol.LocatedBlock( + ExtendedBlockWritable.convertExtendedBlock(lb[i].getBlock()), + DatanodeInfoWritable.convertDatanodeInfo(lb[i].getLocations()), + lb[i].getStartOffset(), lb[i].isCorrupt()); + } + return result; + } + + static public List + convertLocatedBlock( + List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = + new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(LocatedBlockWritable.convertLocatedBlock(lb.get(i))); + } + return result; + } + + static public List + convertLocatedBlock2(List lb) { + if (lb == null) return null; + final int len = lb.size(); + List result = new ArrayList(len); + for (int i = 0; i < len; ++i) { + result.add(LocatedBlockWritable.convertLocatedBlock(lb.get(i))); + } + return result; + } + + public LocatedBlockWritable() { + this(new ExtendedBlockWritable(), new DatanodeInfoWritable[0], 0L, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable eb) { + this(eb, new DatanodeInfoWritable[0], 0L, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs) { + this(b, locs, -1, false); // startOffset is unknown + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs, long startOffset) { + this(b, locs, startOffset, false); + } + + public LocatedBlockWritable(ExtendedBlockWritable b, DatanodeInfoWritable[] locs, long startOffset, + boolean corrupt) { + this.b = b; + this.offset = startOffset; + this.corrupt = corrupt; + if (locs==null) { + this.locs = new DatanodeInfoWritable[0]; + } else { + this.locs = locs; + } + } + + public TokenWritable getBlockToken() { + return blockToken; + } + + public void setBlockToken(TokenWritable token) { + this.blockToken = token; + } + + public ExtendedBlockWritable getBlock() { + return b; + } + + public DatanodeInfoWritable[] getLocations() { + return locs; + } + + public long getStartOffset() { + return offset; + } + + public long getBlockSize() { + return b.getNumBytes(); + } + + void setStartOffset(long value) { + this.offset = value; + } + + void setCorrupt(boolean corrupt) { + this.corrupt = corrupt; + } + + public boolean isCorrupt() { + return this.corrupt; + } + + /////////////////////////////////////////// + // Writable + /////////////////////////////////////////// + @Override + public void write(DataOutput out) throws IOException { + blockToken.write(out); + out.writeBoolean(corrupt); + out.writeLong(offset); + b.write(out); + out.writeInt(locs.length); + for (int i = 0; i < locs.length; i++) { + locs[i].write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + blockToken.readFields(in); + this.corrupt = in.readBoolean(); + offset = in.readLong(); + this.b = new ExtendedBlockWritable(); + b.readFields(in); + int count = in.readInt(); + this.locs = new DatanodeInfoWritable[count]; + for (int i = 0; i < locs.length; i++) { + locs[i] = new DatanodeInfoWritable(); + locs[i].readFields(in); + } + } + + /** Read LocatedBlock from in. */ + public static LocatedBlockWritable read(DataInput in) throws IOException { + final LocatedBlockWritable lb = new LocatedBlockWritable(); + lb.readFields(in); + return lb; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + b + + "; getBlockSize()=" + getBlockSize() + + "; corrupt=" + corrupt + + "; offset=" + offset + + "; locs=" + java.util.Arrays.asList(locs) + + "}"; + } +} Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java?rev=1179877&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/LocatedBlocksWritable.java Thu Oct 6 21:58:22 2011 @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import org.apache.avro.reflect.Nullable; + +/** + * Collection of blocks with their locations and the file length. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class LocatedBlocksWritable implements Writable { + private long fileLength; + private List blocks; // array of blocks with prioritized locations + private boolean underConstruction; + @Nullable + private LocatedBlockWritable lastLocatedBlock = null; + private boolean isLastBlockComplete = false; + + public static org.apache.hadoop.hdfs.protocol.LocatedBlocks convertLocatedBlocks( + LocatedBlocksWritable lb) { + if (lb == null) { + return null; + } + return new org.apache.hadoop.hdfs.protocol.LocatedBlocks( + lb.getFileLength(), lb.isUnderConstruction(), + LocatedBlockWritable.convertLocatedBlock(lb.getLocatedBlocks()), + LocatedBlockWritable.convertLocatedBlock(lb.getLastLocatedBlock()), + lb.isLastBlockComplete()); + } + + public static LocatedBlocksWritable convertLocatedBlocks( + org.apache.hadoop.hdfs.protocol.LocatedBlocks lb) { + if (lb == null) { + return null; + } + return new LocatedBlocksWritable(lb.getFileLength(), lb.isUnderConstruction(), + LocatedBlockWritable.convertLocatedBlock2(lb.getLocatedBlocks()), + LocatedBlockWritable.convertLocatedBlock(lb.getLastLocatedBlock()), + lb.isLastBlockComplete()); + } + + public LocatedBlocksWritable() { + this(0, false, null, null, false); + } + + /** public Constructor */ + public LocatedBlocksWritable(long flength, boolean isUnderConstuction, + List blks, + LocatedBlockWritable lastBlock, boolean isLastBlockCompleted) { + fileLength = flength; + blocks = blks; + underConstruction = isUnderConstuction; + this.lastLocatedBlock = lastBlock; + this.isLastBlockComplete = isLastBlockCompleted; + } + + /** + * Get located blocks. + */ + public List getLocatedBlocks() { + return blocks; + } + + /** Get the last located block. */ + public LocatedBlockWritable getLastLocatedBlock() { + return lastLocatedBlock; + } + + /** Is the last block completed? */ + public boolean isLastBlockComplete() { + return isLastBlockComplete; + } + + /** + * Get located block. + */ + public LocatedBlockWritable get(int index) { + return blocks.get(index); + } + + /** + * Get number of located blocks. + */ + public int locatedBlockCount() { + return blocks == null ? 0 : blocks.size(); + } + + /** + * Get file length + */ + public long getFileLength() { + return this.fileLength; + } + + /** + * Return ture if file was under construction when + * this LocatedBlocks was constructed, false otherwise. + */ + public boolean isUnderConstruction() { + return underConstruction; + } + + ////////////////////////////////////////////////// + // Writable + ////////////////////////////////////////////////// + static { // register a ctor + WritableFactories.setFactory + (LocatedBlocksWritable.class, + new WritableFactory() { + public Writable newInstance() { return new LocatedBlocksWritable(); } + }); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(this.fileLength); + out.writeBoolean(underConstruction); + + //write the last located block + final boolean isNull = lastLocatedBlock == null; + out.writeBoolean(isNull); + if (!isNull) { + lastLocatedBlock.write(out); + } + out.writeBoolean(isLastBlockComplete); + + // write located blocks + int nrBlocks = locatedBlockCount(); + out.writeInt(nrBlocks); + if (nrBlocks == 0) { + return; + } + for (LocatedBlockWritable blk : this.blocks) { + blk.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.fileLength = in.readLong(); + underConstruction = in.readBoolean(); + + //read the last located block + final boolean isNull = in.readBoolean(); + if (!isNull) { + lastLocatedBlock = LocatedBlockWritable.read(in); + } + isLastBlockComplete = in.readBoolean(); + + // read located blocks + int nrBlocks = in.readInt(); + this.blocks = new ArrayList(nrBlocks); + for (int idx = 0; idx < nrBlocks; idx++) { + LocatedBlockWritable blk = new LocatedBlockWritable(); + blk.readFields(in); + this.blocks.add(blk); + } + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(getClass().getSimpleName()); + b.append("{") + .append("\n fileLength=").append(fileLength) + .append("\n underConstruction=").append(underConstruction) + .append("\n blocks=").append(blocks) + .append("\n lastLocatedBlock=").append(lastLocatedBlock) + .append("\n isLastBlockComplete=").append(isLastBlockComplete) + .append("}"); + return b.toString(); + } +} Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java?rev=1179877&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolR23Compatible/ProtocolSignatureWritable.java Thu Oct 6 21:58:22 2011 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocolR23Compatible; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProtocolSignatureWritable implements Writable { + static { // register a ctor + WritableFactories.setFactory + (ProtocolSignatureWritable.class, + new WritableFactory() { + public Writable newInstance() { return new ProtocolSignatureWritable(); } + }); + } + + private long version; + private int[] methods = null; // an array of method hash codes + + public static org.apache.hadoop.ipc.ProtocolSignature convert( + final ProtocolSignatureWritable ps) { + if (ps == null) return null; + return new org.apache.hadoop.ipc.ProtocolSignature( + ps.getVersion(), ps.getMethods()); + } + + public static ProtocolSignatureWritable convert( + final org.apache.hadoop.ipc.ProtocolSignature ps) { + if (ps == null) return null; + return new ProtocolSignatureWritable(ps.getVersion(), ps.getMethods()); + } + + /** + * default constructor + */ + public ProtocolSignatureWritable() { + } + + /** + * Constructor + * + * @param version server version + * @param methodHashcodes hash codes of the methods supported by server + */ + public ProtocolSignatureWritable(long version, int[] methodHashcodes) { + this.version = version; + this.methods = methodHashcodes; + } + + public long getVersion() { + return version; + } + + public int[] getMethods() { + return methods; + } + + @Override + public void readFields(DataInput in) throws IOException { + version = in.readLong(); + boolean hasMethods = in.readBoolean(); + if (hasMethods) { + int numMethods = in.readInt(); + methods = new int[numMethods]; + for (int i=0; i + + + + Namenode Client Protocols Compatible with the version + of Hadoop Release 23 + + +

+This package is for ALL versions of HDFS protocols that use writable data types +and are compatible with the version of the protocol that was + shipped with Release 23 of Hadoop. +

+ +Compatibility should be maintained: +
    +
  • Do NOT delete any methods
  • +
  • Do NOT change the signatures of any method: + do not change parameters, parameter types +or exceptions thrown by the method.
  • +
+

+You can add new methods and new types. If you need to change a method's +signature, please add a new method instead. +When you add new methods and new types do NOT change the version number. +

+Version number is changed ONLY when compatibility is broken (which +should be very rare and a big deal). +

\ No newline at end of file Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1179877&r1=1179876&r2=1179877&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Oct 6 21:58:22 2011 @@ -123,6 +123,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -576,8 +577,13 @@ public class DataNode extends Configured InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.get("dfs.datanode.ipc.address")); - // Add all the RPC protocols that the Datanode implements - ipcServer = RPC.getServer(ClientDatanodeProtocol.class, this, ipcAddr.getHostName(), + // Add all the RPC protocols that the Datanode implements + ClientDatanodeProtocolServerSideTranslatorR23 + clientDatanodeProtocolServerTranslator = + new ClientDatanodeProtocolServerSideTranslatorR23(this); + ipcServer = RPC.getServer( + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, + clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, DFS_DATANODE_HANDLER_COUNT_DEFAULT), Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1179877&r1=1179876&r2=1179877&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Oct 6 21:58:22 2011 @@ -56,6 +56,8 @@ import org.apache.hadoop.hdfs.protocol.U import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol; +import org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeProtocolServerSideTranslatorR23; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; @@ -143,10 +145,13 @@ class NameNodeRpcServer implements Namen serviceRPCAddress = null; } // Add all the RPC protocols that the namenode implements - this.server = RPC.getServer(ClientProtocol.class, this, - socAddr.getHostName(), socAddr.getPort(), - handlerCount, false, conf, - namesystem.getDelegationTokenSecretManager()); + ClientNamenodeProtocolServerSideTranslatorR23 clientProtocolServerTranslator = + new ClientNamenodeProtocolServerSideTranslatorR23(this); + this.server = RPC.getServer( + org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol.class, + clientProtocolServerTranslator, socAddr.getHostName(), + socAddr.getPort(), handlerCount, false, conf, + namesystem.getDelegationTokenSecretManager()); this.server.addProtocol(DatanodeProtocol.class, this); this.server.addProtocol(NamenodeProtocol.class, this); this.server.addProtocol(RefreshAuthorizationPolicyProtocol.class, this); @@ -210,7 +215,8 @@ class NameNodeRpcServer implements Namen public long getProtocolVersion(String protocol, long clientVersion) throws IOException { if (protocol.equals(ClientProtocol.class.getName())) { - return ClientProtocol.versionID; + throw new IOException("Old Namenode Client protocol is not supported:" + + protocol + "Switch your clientside to " + ClientNamenodeWireProtocol.class); } else if (protocol.equals(DatanodeProtocol.class.getName())){ return DatanodeProtocol.versionID; } else if (protocol.equals(NamenodeProtocol.class.getName())){ Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java?rev=1179877&r1=1179876&r2=1179877&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java Thu Oct 6 21:58:22 2011 @@ -51,12 +51,12 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtocolSignature; @@ -96,9 +96,9 @@ public class TestBlockToken { ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL); } - + /** Directory where we can count our open file descriptors under Linux */ - static File FD_DIR = new File("/proc/self/fd/"); + static File FD_DIR = new File("/proc/self/fd/"); long blockKeyUpdateInterval = 10 * 60 * 1000; // 10 mins long blockTokenLifetime = 2 * 60 * 1000; // 2 mins @@ -120,7 +120,8 @@ public class TestBlockToken { public Long answer(InvocationOnMock invocation) throws IOException { Object args[] = invocation.getArguments(); assertEquals(1, args.length); - ExtendedBlock block = (ExtendedBlock) args[0]; + org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable block = + (org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable) args[0]; Set tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); assertEquals("Only one BlockTokenIdentifier expected", 1, tokenIds.size()); @@ -129,7 +130,9 @@ public class TestBlockToken { BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; LOG.info("Got: " + id.toString()); assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); - sm.checkAccess(id, null, block, BlockTokenSecretManager.AccessMode.WRITE); + sm.checkAccess(id, null, org.apache.hadoop.hdfs.protocolR23Compatible. + ExtendedBlockWritable.convertExtendedBlock(block), + BlockTokenSecretManager.AccessMode.WRITE); result = id.getBlockId(); } return result; @@ -137,7 +140,8 @@ public class TestBlockToken { } private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, - ExtendedBlock block, EnumSet accessModes) + ExtendedBlock block, + EnumSet accessModes) throws IOException { Token token = sm.generateToken(block, accessModes); BlockTokenIdentifier id = sm.createIdentifier(); @@ -151,12 +155,12 @@ public class TestBlockToken { TestWritable.testWritable(new BlockTokenIdentifier()); BlockTokenSecretManager sm = new BlockTokenSecretManager(true, blockKeyUpdateInterval, blockTokenLifetime); - TestWritable.testWritable(generateTokenId(sm, block1, EnumSet - .allOf(BlockTokenSecretManager.AccessMode.class))); - TestWritable.testWritable(generateTokenId(sm, block2, EnumSet - .of(BlockTokenSecretManager.AccessMode.WRITE))); - TestWritable.testWritable(generateTokenId(sm, block3, EnumSet - .noneOf(BlockTokenSecretManager.AccessMode.class))); + TestWritable.testWritable(generateTokenId(sm, block1, + EnumSet.allOf(BlockTokenSecretManager.AccessMode.class))); + TestWritable.testWritable(generateTokenId(sm, block2, + EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE))); + TestWritable.testWritable(generateTokenId(sm, block3, + EnumSet.noneOf(BlockTokenSecretManager.AccessMode.class))); } private void tokenGenerationAndVerification(BlockTokenSecretManager master, @@ -176,8 +180,8 @@ public class TestBlockToken { slave.checkAccess(token2, null, block2, mode); } // multi-mode tokens - Token mtoken = master.generateToken(block3, EnumSet - .allOf(BlockTokenSecretManager.AccessMode.class)); + Token mtoken = master.generateToken(block3, + EnumSet.allOf(BlockTokenSecretManager.AccessMode.class)); for (BlockTokenSecretManager.AccessMode mode : BlockTokenSecretManager.AccessMode .values()) { master.checkAccess(mtoken, null, block3, mode); @@ -202,25 +206,28 @@ public class TestBlockToken { slaveHandler.setKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler); } - + private Server createMockDatanode(BlockTokenSecretManager sm, Token token) throws IOException { - ClientDatanodeProtocol mockDN = mock(ClientDatanodeProtocol.class); + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol mockDN = + mock(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class); when(mockDN.getProtocolVersion(anyString(), anyLong())).thenReturn( - ClientDatanodeProtocol.versionID); - doReturn(ProtocolSignature.getProtocolSignature( - mockDN, ClientDatanodeProtocol.class.getName(), - ClientDatanodeProtocol.versionID, 0)) - .when(mockDN).getProtocolSignature(anyString(), anyLong(), anyInt()); + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID); + doReturn( + ProtocolSignature.getProtocolSignature(mockDN, + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class.getName(), + org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.versionID, 0)).when(mockDN) + .getProtocolSignature(anyString(), anyLong(), anyInt()); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); doAnswer(new getLengthAnswer(sm, id)).when(mockDN).getReplicaVisibleLength( - any(ExtendedBlock.class)); + any(org.apache.hadoop.hdfs.protocolR23Compatible.ExtendedBlockWritable.class)); - return RPC.getServer(ClientDatanodeProtocol.class, mockDN, - ADDRESS, 0, 5, true, conf, sm); + return RPC.getServer(org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class, + mockDN, ADDRESS, 0, 5, + true, conf, sm); } @Test @@ -241,9 +248,8 @@ public class TestBlockToken { ClientDatanodeProtocol proxy = null; try { - proxy = RPC.getProxy( - ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, addr, - ticket, conf, NetUtils.getDefaultSocketFactory(conf)); + proxy = DFSUtil.createClientDatanodeProtocolProxy(addr, ticket, conf, + NetUtils.getDefaultSocketFactory(conf)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); } finally { server.stop(); @@ -255,8 +261,8 @@ public class TestBlockToken { /** * Test that fast repeated invocations of createClientDatanodeProtocolProxy - * will not end up using up thousands of sockets. This is a regression test for - * HDFS-1965. + * will not end up using up thousands of sockets. This is a regression test + * for HDFS-1965. */ @Test public void testBlockTokenRpcLeak() throws Exception { @@ -270,9 +276,9 @@ public class TestBlockToken { server.start(); final InetSocketAddress addr = NetUtils.getConnectAddress(server); - DatanodeID fakeDnId = new DatanodeID( - "localhost:" + addr.getPort(), "fake-storage", 0, addr.getPort()); - + DatanodeID fakeDnId = new DatanodeID("localhost:" + addr.getPort(), + "fake-storage", 0, addr.getPort()); + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L)); LocatedBlock fakeBlock = new LocatedBlock(b, new DatanodeInfo[0]); fakeBlock.setBlockToken(token); @@ -282,19 +288,19 @@ public class TestBlockToken { // RPC "Client" object to stay above 0 such that RPC.stopProxy doesn't // actually close the TCP connections to the real target DN. ClientDatanodeProtocol proxyToNoWhere = RPC.getProxy( - ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, + ClientDatanodeProtocol.class, ClientDatanodeProtocol.versionID, new InetSocketAddress("1.1.1.1", 1), - UserGroupInformation.createRemoteUser("junk"), - conf, NetUtils.getDefaultSocketFactory(conf)); - + UserGroupInformation.createRemoteUser("junk"), conf, + NetUtils.getDefaultSocketFactory(conf)); + ClientDatanodeProtocol proxy = null; int fdsAtStart = countOpenFileDescriptors(); try { long endTime = System.currentTimeMillis() + 3000; while (System.currentTimeMillis() < endTime) { - proxy = DFSUtil.createClientDatanodeProtocolProxy( - fakeDnId, conf, 1000, fakeBlock); + proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000, + fakeBlock); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); if (proxy != null) { RPC.stopProxy(proxy); @@ -303,32 +309,31 @@ public class TestBlockToken { } int fdsAtEnd = countOpenFileDescriptors(); - + if (fdsAtEnd - fdsAtStart > 50) { fail("Leaked " + (fdsAtEnd - fdsAtStart) + " fds!"); } } finally { server.stop(); } - + RPC.stopProxy(proxyToNoWhere); } /** - * @return the current number of file descriptors open by this - * process. + * @return the current number of file descriptors open by this process. */ private static int countOpenFileDescriptors() throws IOException { return FD_DIR.list().length; } - /** + /** * Test {@link BlockPoolTokenSecretManager} */ @Test public void testBlockPoolTokenSecretManager() throws Exception { BlockPoolTokenSecretManager bpMgr = new BlockPoolTokenSecretManager(); - + // Test BlockPoolSecretManager with upto 10 block pools for (int i = 0; i < 10; i++) { String bpid = Integer.toString(i); @@ -337,12 +342,11 @@ public class TestBlockToken { BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(false, blockKeyUpdateInterval, blockTokenLifetime); bpMgr.addBlockPool(bpid, slaveHandler); - - + ExportedBlockKeys keys = masterHandler.exportKeys(); bpMgr.setKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); - + // Test key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); @@ -351,11 +355,12 @@ public class TestBlockToken { tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid)); } } - + /** - * This test writes a file and gets the block locations without closing - * the file, and tests the block token in the last block. Block token is - * verified by ensuring it is of correct kind. + * This test writes a file and gets the block locations without closing the + * file, and tests the block token in the last block. Block token is verified + * by ensuring it is of correct kind. + * * @throws IOException * @throws InterruptedException */ @@ -389,5 +394,5 @@ public class TestBlockToken { } finally { cluster.shutdown(); } - } + } }