Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F166FD165 for ; Fri, 19 Oct 2012 02:29:49 +0000 (UTC) Received: (qmail 45444 invoked by uid 500); 19 Oct 2012 02:29:49 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 45313 invoked by uid 500); 19 Oct 2012 02:29:49 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 45238 invoked by uid 99); 19 Oct 2012 02:29:49 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 02:29:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 02:29:39 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 29F562388C7B; Fri, 19 Oct 2012 02:28:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1399950 [8/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach... Date: Fri, 19 Oct 2012 02:28:07 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121019022842.29F562388C7B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java Fri Oct 19 02:25:55 2012 @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; /** *

@@ -156,9 +157,6 @@ class LeaseRenewer { } } - private final String clienNamePostfix = DFSUtil.getRandom().nextInt() - + "_" + Thread.currentThread().getId(); - /** The time in milliseconds that the map became empty. */ private long emptyTime = Long.MAX_VALUE; /** A fixed lease renewal time period in milliseconds */ @@ -212,11 +210,6 @@ class LeaseRenewer { return renewal; } - /** @return the client name for the given id. */ - String getClientName(final String id) { - return "DFSClient_" + id + "_" + clienNamePostfix; - } - /** Add a client. */ private synchronized void addClient(final DFSClient dfsc) { for(DFSClient c : dfsclients) { @@ -270,6 +263,11 @@ class LeaseRenewer { synchronized boolean isRunning() { return daemon != null && daemon.isAlive(); } + + /** Does this renewer have nothing to renew? */ + public boolean isEmpty() { + return dfsclients.isEmpty(); + } /** Used only by tests */ synchronized String getDaemonName() { @@ -279,7 +277,7 @@ class LeaseRenewer { /** Is the empty period longer than the grace period? */ private synchronized boolean isRenewerExpired() { return emptyTime != Long.MAX_VALUE - && System.currentTimeMillis() - emptyTime > gracePeriod; + && Time.now() - emptyTime > gracePeriod; } synchronized void put(final String src, final DFSOutputStream out, @@ -330,6 +328,9 @@ class LeaseRenewer { dfsc.removeFileBeingWritten(src); synchronized(this) { + if (dfsc.isFilesBeingWrittenEmpty()) { + dfsclients.remove(dfsc); + } //update emptyTime if necessary if (emptyTime == Long.MAX_VALUE) { for(DFSClient c : dfsclients) { @@ -339,7 +340,7 @@ class LeaseRenewer { } } //discover the first time that all file-being-written maps are empty. - emptyTime = System.currentTimeMillis(); + emptyTime = Time.now(); } } } @@ -354,7 +355,7 @@ class LeaseRenewer { } if (emptyTime == Long.MAX_VALUE) { //discover the first time that the client list is empty. - emptyTime = System.currentTimeMillis(); + emptyTime = Time.now(); } } @@ -427,10 +428,9 @@ class LeaseRenewer { * when the lease period is half over. */ private void run(final int id) throws InterruptedException { - for(long lastRenewed = System.currentTimeMillis(); - clientsRunning() && !Thread.interrupted(); + for(long lastRenewed = Time.now(); !Thread.interrupted(); Thread.sleep(getSleepPeriod())) { - final long elapsed = System.currentTimeMillis() - lastRenewed; + final long elapsed = Time.now() - lastRenewed; if (elapsed >= getRenewalTime()) { try { renew(); @@ -438,7 +438,7 @@ class LeaseRenewer { LOG.debug("Lease renewer daemon for " + clientsString() + " with renew id " + id + " executed"); } - lastRenewed = System.currentTimeMillis(); + lastRenewed = Time.now(); } catch (SocketTimeoutException ie) { LOG.warn("Failed to renew lease for " + clientsString() + " for " + (elapsed/1000) + " seconds. Aborting ...", ie); @@ -468,6 +468,13 @@ class LeaseRenewer { //no longer the current daemon or expired return; } + + // if no clients are in running state or there is no more clients + // registered with this renewer, stop the daemon after the grace + // period. + if (!clientsRunning() && emptyTime == Long.MAX_VALUE) { + emptyTime = Time.now(); + } } } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Oct 19 02:25:55 2012 @@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -68,7 +69,6 @@ import org.apache.hadoop.security.author import org.apache.hadoop.tools.GetUserMappingsProtocol; import com.google.common.base.Preconditions; -import com.google.protobuf.ServiceException; /** * Create proxy objects to communicate with a remote NN. All remote access to an @@ -243,99 +243,20 @@ public class NameNodeProxies { return new NamenodeProtocolTranslatorPB(proxy); } - /** - * Return the default retry policy used in RPC. - * - * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL. - * - * Otherwise, first unwrap ServiceException if possible, and then - * (1) use multipleLinearRandomRetry for - * - SafeModeException, or - * - IOException other than RemoteException, or - * - ServiceException; and - * (2) use TRY_ONCE_THEN_FAIL for - * - non-SafeMode RemoteException, or - * - non-IOException. - * - * Note that dfs.client.retry.max < 0 is not allowed. - */ - private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { - final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); - } - if (multipleLinearRandomRetry == null) { - //no retry - return RetryPolicies.TRY_ONCE_THEN_FAIL; - } else { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retries, int failovers, - boolean isMethodIdempotent) throws Exception { - if (e instanceof ServiceException) { - //unwrap ServiceException - final Throwable cause = e.getCause(); - if (cause != null && cause instanceof Exception) { - e = (Exception)cause; - } - } - - //see (1) and (2) in the javadoc of this method. - final RetryPolicy p; - if (e instanceof RemoteException) { - final RemoteException re = (RemoteException)e; - p = SafeModeException.class.getName().equals(re.getClassName())? - multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL; - } else if (e instanceof IOException || e instanceof ServiceException) { - p = multipleLinearRandomRetry; - } else { //non-IOException - p = RetryPolicies.TRY_ONCE_THEN_FAIL; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("RETRY " + retries + ") policy=" - + p.getClass().getSimpleName() + ", exception=" + e); - } - LOG.info("RETRY " + retries + ") policy=" - + p.getClass().getSimpleName() + ", exception=" + e); - return p.shouldRetry(e, retries, failovers, isMethodIdempotent); - } - }; - } - } - - /** - * Return the MultipleLinearRandomRetry policy specified in the conf, - * or null if the feature is disabled. - * If the policy is specified in the conf but the policy cannot be parsed, - * the default policy is returned. - * - * Conf property: N pairs of sleep-time and number-of-retries - * dfs.client.retry.policy = "s1,n1,s2,n2,..." - */ - private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) { - final boolean enabled = conf.getBoolean( - DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, - DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT); - if (!enabled) { - return null; - } - - final String policy = conf.get( - DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, - DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); - - final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy); - return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString( - DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT); - } - private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); + final RetryPolicy defaultPolicy = + RetryUtils.getDefaultRetryPolicy( + conf, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, + DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT, + SafeModeException.class); + final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Oct 19 02:25:55 2012 @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -458,7 +459,9 @@ public class RemoteBlockReader extends F void sendReadResult(Socket sock, Status statusCode) { assert !sentStatusCode : "already sent status code to " + sock; try { - RemoteBlockReader2.writeReadResult(sock, statusCode); + RemoteBlockReader2.writeReadResult( + NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT), + statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. @@ -484,4 +487,11 @@ public class RemoteBlockReader extends F throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); } + @Override + public IOStreamPair getStreams() { + // This class doesn't support encryption, which is the only thing this + // method is used for. See HDFS-3637. + return null; + } + } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Oct 19 02:25:55 2012 @@ -32,26 +32,23 @@ import java.nio.channels.ReadableByteCha import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.util.DirectBufferPool; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import com.google.common.base.Preconditions; - /** * This is a wrapper around connection to datanode * and understands checksum, offset etc. @@ -83,15 +80,15 @@ public class RemoteBlockReader2 impleme static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); - Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. + Socket dnSock; + // for now just sending the status code (e.g. checksumOk) after the read. + private IOStreamPair ioStreams; private final ReadableByteChannel in; private DataChecksum checksum; - private PacketHeader curHeader; - private ByteBuffer curPacketBuf = null; + private PacketReceiver packetReceiver = new PacketReceiver(true); private ByteBuffer curDataSlice = null; - /** offset in block of the last chunk received */ private long lastSeqNo = -1; @@ -99,10 +96,6 @@ public class RemoteBlockReader2 impleme private long startOffset; private final String filename; - private static DirectBufferPool bufferPool = new DirectBufferPool(); - private final ByteBuffer headerBuf = ByteBuffer.allocate( - PacketHeader.PKT_HEADER_LEN); - private final int bytesPerChecksum; private final int checksumSize; @@ -126,7 +119,7 @@ public class RemoteBlockReader2 impleme public synchronized int read(byte[] buf, int off, int len) throws IOException { - if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } if (curDataSlice.remaining() == 0) { @@ -143,7 +136,7 @@ public class RemoteBlockReader2 impleme @Override public int read(ByteBuffer buf) throws IOException { - if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } if (curDataSlice.remaining() == 0) { @@ -161,11 +154,13 @@ public class RemoteBlockReader2 impleme } private void readNextPacket() throws IOException { - Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); - //Read packet headers. - readPacketHeader(); + packetReceiver.receiveNextPacket(in); + PacketHeader curHeader = packetReceiver.getHeader(); + curDataSlice = packetReceiver.getDataSlice(); + assert curDataSlice.capacity() == curHeader.getDataLen(); + if (LOG.isTraceEnabled()) { LOG.trace("DFSClient readNextPacket got header " + curHeader); } @@ -179,17 +174,20 @@ public class RemoteBlockReader2 impleme if (curHeader.getDataLen() > 0) { int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; int checksumsLen = chunks * checksumSize; - int bufsize = checksumsLen + curHeader.getDataLen(); + + assert packetReceiver.getChecksumSlice().capacity() == checksumsLen : + "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + + " checksumsLen=" + checksumsLen; - resetPacketBuffer(checksumsLen, curHeader.getDataLen()); - lastSeqNo = curHeader.getSeqno(); - if (bufsize > 0) { - readChannelFully(in, curPacketBuf); - curPacketBuf.flip(); - if (verifyChecksum) { - verifyPacketChecksums(); - } + if (verifyChecksum && curDataSlice.remaining() > 0) { + // N.B.: the checksum error offset reported here is actually + // relative to the start of the block, not the start of the file. + // This is slightly misleading, but preserves the behavior from + // the older BlockReader. + checksum.verifyChunkedSums(curDataSlice, + packetReceiver.getChecksumSlice(), + filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); } @@ -206,46 +204,13 @@ public class RemoteBlockReader2 impleme if (bytesNeededToFinish <= 0) { readTrailingEmptyPacket(); if (verifyChecksum) { - sendReadResult(dnSock, Status.CHECKSUM_OK); + sendReadResult(Status.CHECKSUM_OK); } else { - sendReadResult(dnSock, Status.SUCCESS); - } - } - } - - private void verifyPacketChecksums() throws ChecksumException { - // N.B.: the checksum error offset reported here is actually - // relative to the start of the block, not the start of the file. - // This is slightly misleading, but preserves the behavior from - // the older BlockReader. - checksum.verifyChunkedSums(curDataSlice, curPacketBuf, - filename, curHeader.getOffsetInBlock()); - } - - private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) - throws IOException { - while (buf.remaining() > 0) { - int n = ch.read(buf); - if (n < 0) { - throw new IOException("Premature EOF reading from " + ch); + sendReadResult(Status.SUCCESS); } } } - - private void resetPacketBuffer(int checksumsLen, int dataLen) { - int packetLen = checksumsLen + dataLen; - if (curPacketBuf == null || - curPacketBuf.capacity() < packetLen) { - returnPacketBufToPool(); - curPacketBuf = bufferPool.getBuffer(packetLen); - } - curPacketBuf.position(checksumsLen); - curDataSlice = curPacketBuf.slice(); - curDataSlice.limit(dataLen); - curPacketBuf.clear(); - curPacketBuf.limit(checksumsLen + dataLen); - } - + @Override public synchronized long skip(long n) throws IOException { /* How can we make sure we don't throw a ChecksumException, at least @@ -266,23 +231,14 @@ public class RemoteBlockReader2 impleme return nSkipped; } - private void readPacketHeader() throws IOException { - headerBuf.clear(); - readChannelFully(in, headerBuf); - headerBuf.flip(); - if (curHeader == null) curHeader = new PacketHeader(); - curHeader.readFields(headerBuf); - } - private void readTrailingEmptyPacket() throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Reading empty packet at end of read"); } - headerBuf.clear(); - readChannelFully(in, headerBuf); - headerBuf.flip(); - PacketHeader trailer = new PacketHeader(); - trailer.readFields(headerBuf); + + packetReceiver.receiveNextPacket(in); + + PacketHeader trailer = packetReceiver.getHeader(); if (!trailer.isLastPacketInBlock() || trailer.getDataLen() != 0) { throw new IOException("Expected empty end-of-read packet! Header: " + @@ -292,9 +248,11 @@ public class RemoteBlockReader2 impleme protected RemoteBlockReader2(String file, String bpid, long blockId, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, - long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { + long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock, + IOStreamPair ioStreams) { // Path is used only for printing block and file information in debug this.dnSock = dnSock; + this.ioStreams = ioStreams; this.in = in; this.checksum = checksum; this.verifyChecksum = verifyChecksum; @@ -313,7 +271,7 @@ public class RemoteBlockReader2 impleme @Override public synchronized void close() throws IOException { - returnPacketBufToPool(); + packetReceiver.close(); startOffset = -1; checksum = null; @@ -324,24 +282,6 @@ public class RemoteBlockReader2 impleme // in will be closed when its Socket is closed. } - @Override - protected void finalize() throws Throwable { - try { - // just in case it didn't get closed, we - // may as well still try to return the buffer - returnPacketBufToPool(); - } finally { - super.finalize(); - } - } - - private void returnPacketBufToPool() { - if (curPacketBuf != null) { - bufferPool.returnBuffer(curPacketBuf); - curPacketBuf = null; - } - } - /** * Take the socket used to talk to the DN. */ @@ -369,24 +309,23 @@ public class RemoteBlockReader2 impleme * closing our connection (which we will re-open), but won't affect * data correctness. */ - void sendReadResult(Socket sock, Status statusCode) { - assert !sentStatusCode : "already sent status code to " + sock; + void sendReadResult(Status statusCode) { + assert !sentStatusCode : "already sent status code to " + dnSock; try { - writeReadResult(sock, statusCode); + writeReadResult(ioStreams.out, statusCode); sentStatusCode = true; } catch (IOException e) { // It's ok not to be able to send this. But something is probably wrong. LOG.info("Could not send read status (" + statusCode + ") to datanode " + - sock.getInetAddress() + ": " + e.getMessage()); + dnSock.getInetAddress() + ": " + e.getMessage()); } } /** * Serialize the actual read result on the wire. */ - static void writeReadResult(Socket sock, Status statusCode) + static void writeReadResult(OutputStream out, Status statusCode) throws IOException { - OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT); ClientReadStatusProto.newBuilder() .setStatus(statusCode) @@ -434,25 +373,32 @@ public class RemoteBlockReader2 impleme * @param clientName Client name * @return New BlockReader instance, or null on error. */ - public static BlockReader newBlockReader( Socket sock, String file, + public static BlockReader newBlockReader(Socket sock, String file, ExtendedBlock block, Token blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, - String clientName) + String clientName, + DataEncryptionKey encryptionKey, + IOStreamPair ioStreams) throws IOException { + + ReadableByteChannel ch; + if (ioStreams.in instanceof SocketInputWrapper) { + ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel(); + } else { + ch = (ReadableByteChannel) ioStreams.in; + } + // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( - NetUtils.getOutputStream(sock, - HdfsServerConstants.WRITE_TIMEOUT))); + ioStreams.out)); new Sender(out).readBlock(block, blockToken, clientName, startOffset, len); // - // Get bytes in block, set streams + // Get bytes in block // - SocketInputWrapper sin = NetUtils.getInputStream(sock); - ReadableByteChannel ch = sin.getReadableByteChannel(); - DataInputStream in = new DataInputStream(sin); + DataInputStream in = new DataInputStream(ioStreams.in); BlockOpResponseProto status = BlockOpResponseProto.parseFrom( vintPrefixed(in)); @@ -474,7 +420,8 @@ public class RemoteBlockReader2 impleme } return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), - ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock); + ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock, + ioStreams); } static void checkSuccess( @@ -498,4 +445,9 @@ public class RemoteBlockReader2 impleme } } } + + @Override + public IOStreamPair getStreams() { + return ioStreams; + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Oct 19 02:25:55 2012 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.net.Socket; import java.net.SocketAddress; @@ -25,53 +26,135 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.io.IOException; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; /** - * A cache of sockets. + * A cache of input stream sockets to Data Node. */ class SocketCache { - static final Log LOG = LogFactory.getLog(SocketCache.class); + private static final Log LOG = LogFactory.getLog(SocketCache.class); - private final LinkedListMultimap multimap; - private final int capacity; + @InterfaceAudience.Private + static class SocketAndStreams implements Closeable { + public final Socket sock; + public final IOStreamPair ioStreams; + long createTime; + + public SocketAndStreams(Socket s, IOStreamPair ioStreams) { + this.sock = s; + this.ioStreams = ioStreams; + this.createTime = System.currentTimeMillis(); + } + + @Override + public void close() { + if (ioStreams != null) { + IOUtils.closeStream(ioStreams.in); + IOUtils.closeStream(ioStreams.out); + } + IOUtils.closeSocket(sock); + } - /** - * Create a SocketCache with the given capacity. - * @param capacity Max cache size. - */ - public SocketCache(int capacity) { - multimap = LinkedListMultimap.create(); - this.capacity = capacity; - if (capacity <= 0) { - LOG.debug("SocketCache disabled in configuration."); + public long getCreateTime() { + return this.createTime; } } + private Daemon daemon; + /** A map for per user per datanode. */ + private static LinkedListMultimap multimap = + LinkedListMultimap.create(); + private static int capacity; + private static long expiryPeriod; + private static SocketCache scInstance = new SocketCache(); + private static boolean isInitedOnce = false; + + public static synchronized SocketCache getInstance(int c, long e) { + // capacity is only initialized once + if (isInitedOnce == false) { + capacity = c; + expiryPeriod = e; + + if (capacity == 0 ) { + LOG.info("SocketCache disabled."); + } + else if (expiryPeriod == 0) { + throw new IllegalStateException("Cannot initialize expiryPeriod to " + + expiryPeriod + "when cache is enabled."); + } + isInitedOnce = true; + } else { //already initialized once + if (capacity != c || expiryPeriod != e) { + LOG.info("capacity and expiry periods already set to " + capacity + + " and " + expiryPeriod + " respectively. Cannot set it to " + c + + " and " + e); + } + } + + return scInstance; + } + + private boolean isDaemonStarted() { + return (daemon == null)? false: true; + } + + private synchronized void startExpiryDaemon() { + // start daemon only if not already started + if (isDaemonStarted() == true) { + return; + } + + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + SocketCache.this.run(); + } catch(InterruptedException e) { + //noop + } finally { + SocketCache.this.clear(); + } + } + + @Override + public String toString() { + return String.valueOf(SocketCache.this); + } + }); + daemon.start(); + } + /** * Get a cached socket to the given address. * @param remote Remote address the socket is connected to. * @return A socket with unknown state, possibly closed underneath. Or null. */ - public synchronized Socket get(SocketAddress remote) { + public synchronized SocketAndStreams get(SocketAddress remote) { + if (capacity <= 0) { // disabled return null; } - - List socklist = multimap.get(remote); - if (socklist == null) { + + List sockStreamList = multimap.get(remote); + if (sockStreamList == null) { return null; } - Iterator iter = socklist.iterator(); + Iterator iter = sockStreamList.iterator(); while (iter.hasNext()) { - Socket candidate = iter.next(); + SocketAndStreams candidate = iter.next(); iter.remove(); - if (!candidate.isClosed()) { + if (!candidate.sock.isClosed()) { return candidate; } } @@ -82,14 +165,17 @@ class SocketCache { * Give an unused socket to the cache. * @param sock socket not used by anyone. */ - public synchronized void put(Socket sock) { + public synchronized void put(Socket sock, IOStreamPair ioStreams) { + + Preconditions.checkNotNull(sock); + SocketAndStreams s = new SocketAndStreams(sock, ioStreams); if (capacity <= 0) { // Cache disabled. - IOUtils.closeSocket(sock); + s.close(); return; } - - Preconditions.checkNotNull(sock); + + startExpiryDaemon(); SocketAddress remoteAddr = sock.getRemoteSocketAddress(); if (remoteAddr == null) { @@ -102,7 +188,7 @@ class SocketCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, sock); + multimap.put(remoteAddr, s); } public synchronized int size() { @@ -110,32 +196,67 @@ class SocketCache { } /** + * Evict and close sockets older than expiry period from the cache. + */ + private synchronized void evictExpired(long expiryPeriod) { + while (multimap.size() != 0) { + Iterator> iter = + multimap.entries().iterator(); + Entry entry = iter.next(); + // if oldest socket expired, remove it + if (entry == null || + System.currentTimeMillis() - entry.getValue().getCreateTime() < + expiryPeriod) { + break; + } + iter.remove(); + SocketAndStreams s = entry.getValue(); + s.close(); + } + } + + /** * Evict the oldest entry in the cache. */ private synchronized void evictOldest() { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { - throw new IllegalStateException("Cannot evict from empty cache!"); + throw new IllegalStateException("Cannot evict from empty cache! " + + "capacity: " + capacity); } - Entry entry = iter.next(); + Entry entry = iter.next(); iter.remove(); - Socket sock = entry.getValue(); - IOUtils.closeSocket(sock); + SocketAndStreams s = entry.getValue(); + s.close(); } /** - * Empty the cache, and close all sockets. + * Periodically check in the cache and expire the entries + * older than expiryPeriod minutes */ - public synchronized void clear() { - for (Socket sock : multimap.values()) { - IOUtils.closeSocket(sock); + private void run() throws InterruptedException { + for(long lastExpiryTime = System.currentTimeMillis(); + !Thread.interrupted(); + Thread.sleep(expiryPeriod)) { + final long elapsed = System.currentTimeMillis() - lastExpiryTime; + if (elapsed >= expiryPeriod) { + evictExpired(expiryPeriod); + lastExpiryTime = System.currentTimeMillis(); + } } - multimap.clear(); + clear(); + throw new InterruptedException("Daemon Interrupted"); } - protected void finalize() { - clear(); + /** + * Empty the cache, and close all sockets. + */ + private synchronized void clear() { + for (SocketAndStreams sockAndStream : multimap.values()) { + sockAndStream.close(); + } + multimap.clear(); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Fri Oct 19 02:25:55 2012 @@ -40,6 +40,7 @@ public class Block implements Writable, WritableFactories.setFactory (Block.class, new WritableFactory() { + @Override public Writable newInstance() { return new Block(); } }); } @@ -146,6 +147,7 @@ public class Block implements Writable, /** */ + @Override public String toString() { return getBlockName() + "_" + getGenerationStamp(); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Fri Oct 19 02:25:55 2012 @@ -148,10 +148,12 @@ public class BlockListAsLongs implements this.currentReplicaState = null; } + @Override public boolean hasNext() { return currentBlockIndex < getNumberOfBlocks(); } + @Override public Block next() { block.set(blockId(currentBlockIndex), blockLength(currentBlockIndex), @@ -161,6 +163,7 @@ public class BlockListAsLongs implements return block; } + @Override public void remove() { throw new UnsupportedOperationException("Sorry. can't remove."); } @@ -178,6 +181,7 @@ public class BlockListAsLongs implements /** * Returns an iterator over blocks in the block report. */ + @Override public Iterator iterator() { return getBlockReportIterator(); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Oct 19 02:25:55 2012 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol; import java.io.IOException; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol */ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block, Token token) throws IOException; + + /** + * Retrieves volume location information about a list of blocks on a datanode. + * This is in the form of an opaque {@link VolumeId} for each configured + * data directory, which is not guaranteed to be the same across DN restarts. + * + * @param blocks + * list of blocks on the local datanode + * @param tokens + * block access tokens corresponding to the requested blocks + * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with + * data directories + * @throws IOException + * if datanode is unreachable, or replica is not found on datanode + */ + HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + List> tokens) throws IOException; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Oct 19 02:25:55 2012 @@ -33,8 +33,6 @@ import org.apache.hadoop.fs.UnresolvedLi import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction; -import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.io.EnumSetWritable; @@ -44,6 +42,7 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; @@ -668,6 +667,18 @@ public interface ClientProtocol { */ public void saveNamespace() throws AccessControlException, IOException; + + /** + * Roll the edit log. + * Requires superuser privileges. + * + * @throws AccessControlException if the superuser privilege is violated + * @throws IOException if log roll fails + * @return the txid of the new segment + */ + @Idempotent + public long rollEdits() throws AccessControlException, IOException; + /** * Enable/Disable restore failed storage. *

@@ -694,16 +705,6 @@ public interface ClientProtocol { public void finalizeUpgrade() throws IOException; /** - * Report distributed upgrade progress or force current upgrade to proceed. - * - * @param action {@link HdfsConstants.UpgradeAction} to perform - * @return upgrade status information or null if no upgrades are in progress - * @throws IOException - */ - public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) - throws IOException; - - /** * @return CorruptFileBlocks, containing a list of corrupt files (with * duplicates if there is more than one corrupt block in a file) * and a cookie @@ -941,4 +942,11 @@ public interface ClientProtocol { */ public void cancelDelegationToken(Token token) throws IOException; + + /** + * @return encryption key so a client can encrypt data sent via the + * DataTransferProtocol to/from DataNodes. + * @throws IOException + */ + public DataEncryptionKey getDataEncryptionKey() throws IOException; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java Fri Oct 19 02:25:55 2012 @@ -37,6 +37,7 @@ public class DSQuotaExceededException ex super(quota, count); } + @Override public String getMessage() { String msg = super.getMessage(); if (msg == null) { Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 02:25:55 2012 @@ -37,12 +37,12 @@ import org.apache.hadoop.classification. public class DatanodeID implements Comparable { public static final DatanodeID[] EMPTY_ARRAY = {}; - protected String ipAddr; // IP address - protected String hostName; // hostname - protected String storageID; // unique per cluster storageID - protected int xferPort; // data streaming port - protected int infoPort; // info server port - protected int ipcPort; // IPC server port + private String ipAddr; // IP address + private String hostName; // hostname + private String storageID; // unique per cluster storageID + private int xferPort; // data streaming port + private int infoPort; // info server port + private int ipcPort; // IPC server port public DatanodeID(DatanodeID from) { this(from.getIpAddr(), @@ -104,7 +104,7 @@ public class DatanodeID implements Compa /** * @return IP:ipcPort string */ - public String getIpcAddr() { + private String getIpcAddr() { return ipAddr + ":" + ipcPort; } @@ -123,6 +123,29 @@ public class DatanodeID implements Compa } /** + * @return hostname:ipcPort + */ + private String getIpcAddrWithHostname() { + return hostName + ":" + ipcPort; + } + + /** + * @param useHostname true to use the DN hostname, use the IP otherwise + * @return name:xferPort + */ + public String getXferAddr(boolean useHostname) { + return useHostname ? getXferAddrWithHostname() : getXferAddr(); + } + + /** + * @param useHostname true to use the DN hostname, use the IP otherwise + * @return name:ipcPort + */ + public String getIpcAddr(boolean useHostname) { + return useHostname ? getIpcAddrWithHostname() : getIpcAddr(); + } + + /** * @return data storage ID. */ public String getStorageID() { @@ -150,6 +173,7 @@ public class DatanodeID implements Compa return ipcPort; } + @Override public boolean equals(Object to) { if (this == to) { return true; @@ -161,10 +185,12 @@ public class DatanodeID implements Compa storageID.equals(((DatanodeID)to).getStorageID())); } + @Override public int hashCode() { return getXferAddr().hashCode()^ storageID.hashCode(); } + @Override public String toString() { return getXferAddr(); } @@ -187,6 +213,7 @@ public class DatanodeID implements Compa * @param that * @return as specified by Comparable */ + @Override public int compareTo(DatanodeID that) { return getXferAddr().compareTo(that.getXferAddr()); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 02:25:55 2012 @@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; /** * This class extends the primary identifier of a Datanode with ephemeral @@ -36,13 +37,13 @@ import org.apache.hadoop.util.StringUtil @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeInfo extends DatanodeID implements Node { - protected long capacity; - protected long dfsUsed; - protected long remaining; - protected long blockPoolUsed; - protected long lastUpdate; - protected int xceiverCount; - protected String location = NetworkTopology.DEFAULT_RACK; + private long capacity; + private long dfsUsed; + private long remaining; + private long blockPoolUsed; + private long lastUpdate; + private int xceiverCount; + private String location = NetworkTopology.DEFAULT_RACK; // Datanode administrative states public enum AdminStates { @@ -56,6 +57,7 @@ public class DatanodeInfo extends Datano this.value = v; } + @Override public String toString() { return value; } @@ -79,8 +81,7 @@ public class DatanodeInfo extends Datano this.lastUpdate = from.getLastUpdate(); this.xceiverCount = from.getXceiverCount(); this.location = from.getNetworkLocation(); - this.adminState = from.adminState; - this.hostName = from.hostName; + this.adminState = from.getAdminState(); } public DatanodeInfo(DatanodeID nodeID) { @@ -126,6 +127,7 @@ public class DatanodeInfo extends Datano } /** Network location name */ + @Override public String getName() { return getXferAddr(); } @@ -200,9 +202,11 @@ public class DatanodeInfo extends Datano } /** network location */ + @Override public synchronized String getNetworkLocation() {return location;} /** Sets the network location */ + @Override public synchronized void setNetworkLocation(String location) { this.location = NodeBase.normalize(location); } @@ -317,7 +321,24 @@ public class DatanodeInfo extends Datano } return adminState; } - + + /** + * Check if the datanode is in stale state. Here if + * the namenode has not received heartbeat msg from a + * datanode for more than staleInterval (default value is + * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}), + * the datanode will be treated as stale node. + * + * @param staleInterval + * the time interval for marking the node as stale. If the last + * update time is beyond the given time interval, the node will be + * marked as stale. + * @return true if the node is stale + */ + public boolean isStale(long staleInterval) { + return (Time.now() - lastUpdate) >= staleInterval; + } + /** * Sets the admin state of this node. */ @@ -334,13 +355,17 @@ public class DatanodeInfo extends Datano private transient Node parent; //its parent /** Return this node's parent */ + @Override public Node getParent() { return parent; } + @Override public void setParent(Node parent) {this.parent = parent;} /** Return this node's level in the tree. * E.g. the root of a tree returns 0 and its children return 1 */ + @Override public int getLevel() { return level; } + @Override public void setLevel(int level) {this.level = level;} @Override Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Fri Oct 19 02:25:55 2012 @@ -60,7 +60,7 @@ public class HdfsConstants { public static int MAX_PATH_LENGTH = 8000; public static int MAX_PATH_DEPTH = 1000; - // TODO mb@media-style.com: should be conf injected? + // TODO should be conf injected? public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024; public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt( DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY, @@ -85,16 +85,6 @@ public class HdfsConstants { public static final long INVALID_TXID = -12345; /** - * Distributed upgrade actions: - * - * 1. Get upgrade status. 2. Get detailed upgrade status. 3. Proceed with the - * upgrade if it is stuck, no matter what the status is. - */ - public static enum UpgradeAction { - GET_STATUS, DETAILED_STATUS, FORCE_PROCEED; - } - - /** * URI Scheme for hdfs://namenode/ URIs. */ public static final String HDFS_URI_SCHEME = "hdfs"; Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Fri Oct 19 02:25:55 2012 @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.util.ExactSizeInputStream; import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; @@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil { return ret; } + public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) { + return DataChecksum.Type.valueOf(type.name()); + } + + public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) { + return HdfsProtos.ChecksumTypeProto.valueOf(type.name()); + } + public static InputStream vintPrefixed(final InputStream input) throws IOException { final int firstByte = input.read(); @@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil { return new ExactSizeInputStream(input, size); } -} \ No newline at end of file +} Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Oct 19 02:25:55 2012 @@ -113,6 +113,7 @@ public class LocatedBlocks { Comparator comp = new Comparator() { // Returns 0 iff a is inside b or b is inside a + @Override public int compare(LocatedBlock a, LocatedBlock b) { long aBeg = a.getStartOffset(); long bBeg = b.getStartOffset(); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java Fri Oct 19 02:25:55 2012 @@ -36,6 +36,7 @@ public final class NSQuotaExceededExcept super(quota, count); } + @Override public String getMessage() { String msg = super.getMessage(); if (msg == null) { Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java Fri Oct 19 02:25:55 2012 @@ -58,6 +58,7 @@ public class QuotaExceededException exte this.pathName = path; } + @Override public String getMessage() { return super.getMessage(); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Fri Oct 19 02:25:55 2012 @@ -24,16 +24,13 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import com.google.common.collect.BiMap; -import com.google.common.collect.ImmutableBiMap; - /** * Static utilities for dealing with the protocol buffers used by the @@ -42,19 +39,6 @@ import com.google.common.collect.Immutab @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class DataTransferProtoUtil { - - /** - * Map between the internal DataChecksum identifiers and the protobuf- - * generated identifiers on the wire. - */ - static BiMap checksumTypeMap = - ImmutableBiMap.builder() - .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32) - .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C) - .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL) - .build(); - - static BlockConstructionStage fromProto( OpWriteBlockProto.BlockConstructionStage stage) { return BlockConstructionStage.valueOf(BlockConstructionStage.class, @@ -68,7 +52,7 @@ public abstract class DataTransferProtoU } public static ChecksumProto toProto(DataChecksum checksum) { - ChecksumType type = checksumTypeMap.get(checksum.getChecksumType()); + ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name()); if (type == null) { throw new IllegalArgumentException( "Can't convert checksum to protobuf: " + checksum); @@ -84,7 +68,7 @@ public abstract class DataTransferProtoU if (proto == null) return null; int bytesPerChecksum = proto.getBytesPerChecksum(); - int type = checksumTypeMap.inverse().get(proto.getType()); + DataChecksum.Type type = DataChecksum.Type.valueOf(proto.getType().name()); return DataChecksum.newDataChecksum(type, bytesPerChecksum); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Fri Oct 19 02:25:55 2012 @@ -27,14 +27,31 @@ import org.apache.hadoop.classification. import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto; import org.apache.hadoop.hdfs.util.ByteBufferOutputStream; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Shorts; +import com.google.common.primitives.Ints; +import com.google.protobuf.InvalidProtocolBufferException; + /** * Header data for each packet that goes through the read/write pipelines. + * Includes all of the information about the packet, excluding checksums and + * actual data. + * + * This data includes: + * - the offset in bytes into the HDFS block of the data in this packet + * - the sequence number of this packet in the pipeline + * - whether or not this is the last packet in the pipeline + * - the length of the data in this packet + * - whether or not this packet should be synced by the DNs. + * + * When serialized, this header is written out as a protocol buffer, preceded + * by a 4-byte integer representing the full packet length, and a 2-byte short + * representing the header length. */ @InterfaceAudience.Private @InterfaceStability.Evolving public class PacketHeader { - /** Header size for a packet */ - private static final int PROTO_SIZE = + private static final int MAX_PROTO_SIZE = PacketHeaderProto.newBuilder() .setOffsetInBlock(0) .setSeqno(0) @@ -42,8 +59,10 @@ public class PacketHeader { .setDataLen(0) .setSyncBlock(false) .build().getSerializedSize(); - public static final int PKT_HEADER_LEN = - 6 + PROTO_SIZE; + public static final int PKT_LENGTHS_LEN = + Ints.BYTES + Shorts.BYTES; + public static final int PKT_MAX_HEADER_LEN = + PKT_LENGTHS_LEN + MAX_PROTO_SIZE; private int packetLen; private PacketHeaderProto proto; @@ -54,13 +73,25 @@ public class PacketHeader { public PacketHeader(int packetLen, long offsetInBlock, long seqno, boolean lastPacketInBlock, int dataLen, boolean syncBlock) { this.packetLen = packetLen; - proto = PacketHeaderProto.newBuilder() + Preconditions.checkArgument(packetLen >= Ints.BYTES, + "packet len %s should always be at least 4 bytes", + packetLen); + + PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder() .setOffsetInBlock(offsetInBlock) .setSeqno(seqno) .setLastPacketInBlock(lastPacketInBlock) - .setDataLen(dataLen) - .setSyncBlock(syncBlock) - .build(); + .setDataLen(dataLen); + + if (syncBlock) { + // Only set syncBlock if it is specified. + // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 + // because it changes the length of the packet header, and BlockReceiver + // in that version did not support variable-length headers. + builder.setSyncBlock(syncBlock); + } + + proto = builder.build(); } public int getDataLen() { @@ -90,10 +121,16 @@ public class PacketHeader { @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + - "Header data: " + + " header data: " + proto.toString(); } + public void setFieldsFromData( + int packetLen, byte[] headerData) throws InvalidProtocolBufferException { + this.packetLen = packetLen; + proto = PacketHeaderProto.parseFrom(headerData); + } + public void readFields(ByteBuffer buf) throws IOException { packetLen = buf.getInt(); short protoLen = buf.getShort(); @@ -110,14 +147,21 @@ public class PacketHeader { proto = PacketHeaderProto.parseFrom(data); } + /** + * @return the number of bytes necessary to write out this header, + * including the length-prefixing of the payload and header + */ + public int getSerializedSize() { + return PKT_LENGTHS_LEN + proto.getSerializedSize(); + } /** * Write the header into the buffer. * This requires that PKT_HEADER_LEN bytes are available. */ public void putInBuffer(final ByteBuffer buf) { - assert proto.getSerializedSize() == PROTO_SIZE - : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); try { buf.putInt(packetLen); buf.putShort((short) proto.getSerializedSize()); @@ -128,12 +172,18 @@ public class PacketHeader { } public void write(DataOutputStream out) throws IOException { - assert proto.getSerializedSize() == PROTO_SIZE - : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize(); + assert proto.getSerializedSize() <= MAX_PROTO_SIZE + : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); out.writeInt(packetLen); out.writeShort(proto.getSerializedSize()); proto.writeTo(out); } + + public byte[] getBytes() { + ByteBuffer buf = ByteBuffer.allocate(getSerializedSize()); + putInBuffer(buf); + return buf.array(); + } /** * Perform a sanity check on the packet, returning true if it is sane. Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Fri Oct 19 02:25:55 2012 @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import com.google.protobuf.TextFormat; + /** Pipeline Acknowledgment **/ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -120,6 +122,6 @@ public class PipelineAck { @Override //Object public String toString() { - return proto.toString(); + return TextFormat.shortDebugString(proto); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Oct 19 02:25:55 2012 @@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Receiver implements DataTransferProtocol { - protected final DataInputStream in; - - /** Create a receiver for DataTransferProtocol with a socket. */ - protected Receiver(final DataInputStream in) { + protected DataInputStream in; + + /** Initialize a receiver for DataTransferProtocol with a socket. */ + protected void initialize(final DataInputStream in) { this.in = in; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012 @@ -18,19 +18,31 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServe .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath()) .build(); } + + @Override + public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations( + RpcController controller, GetHdfsBlockLocationsRequestProto request) + throws ServiceException { + HdfsBlocksMetadata resp; + try { + // Construct the Lists to make the actual call + List blocks = + new ArrayList(request.getBlocksCount()); + for (ExtendedBlockProto b : request.getBlocksList()) { + blocks.add(PBHelper.convert(b)); + } + List> tokens = + new ArrayList>(request.getTokensCount()); + for (BlockTokenIdentifierProto b : request.getTokensList()) { + tokens.add(PBHelper.convert(b)); + } + // Call the real implementation + resp = impl.getHdfsBlocksMetadata(blocks, tokens); + } catch (IOException e) { + throw new ServiceException(e); + } + List volumeIdsByteStrings = + new ArrayList(resp.getVolumeIds().size()); + for (byte[] b : resp.getVolumeIds()) { + volumeIdsByteStrings.add(ByteString.copyFrom(b)); + } + // Build and return the response + Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder(); + builder.addAllVolumeIds(volumeIdsByteStrings); + builder.addAllVolumeIndexes(resp.getVolumeIndexes()); + return builder.build(); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Fri Oct 19 02:25:55 2012 @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolP import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; import javax.net.SocketFactory; @@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -73,10 +81,10 @@ public class ClientDatanodeProtocolTrans RefreshNamenodesRequestProto.newBuilder().build(); public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, - Configuration conf, int socketTimeout, LocatedBlock locatedBlock) - throws IOException { + Configuration conf, int socketTimeout, boolean connectToDnViaHostname, + LocatedBlock locatedBlock) throws IOException { rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, - socketTimeout, locatedBlock); + socketTimeout, connectToDnViaHostname, locatedBlock); } public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, @@ -90,11 +98,17 @@ public class ClientDatanodeProtocolTrans * @param datanodeid Datanode to connect to. * @param conf Configuration. * @param socketTimeout Socket timeout to use. + * @param connectToDnViaHostname connect to the Datanode using its hostname * @throws IOException */ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, - Configuration conf, int socketTimeout) throws IOException { - InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); + Configuration conf, int socketTimeout, boolean connectToDnViaHostname) + throws IOException { + final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); + InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); + if (LOG.isDebugEnabled()) { + LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); + } rpcProxy = createClientDatanodeProtocolProxy(addr, UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout); @@ -102,10 +116,11 @@ public class ClientDatanodeProtocolTrans static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( DatanodeID datanodeid, Configuration conf, int socketTimeout, - LocatedBlock locatedBlock) throws IOException { - InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); + boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException { + final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname); + InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr); if (LOG.isDebugEnabled()) { - LOG.debug("ClientDatanodeProtocol addr=" + addr); + LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr); } // Since we're creating a new UserGroupInformation here, we know that no @@ -200,4 +215,44 @@ public class ClientDatanodeProtocolTrans public Object getUnderlyingProxyObject() { return rpcProxy; } -} \ No newline at end of file + + @Override + public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks, + List> tokens) throws IOException { + // Convert to proto objects + List blocksProtos = + new ArrayList(blocks.size()); + List tokensProtos = + new ArrayList(tokens.size()); + for (ExtendedBlock b : blocks) { + blocksProtos.add(PBHelper.convert(b)); + } + for (Token t : tokens) { + tokensProtos.add(PBHelper.convert(t)); + } + // Build the request + GetHdfsBlockLocationsRequestProto request = + GetHdfsBlockLocationsRequestProto.newBuilder() + .addAllBlocks(blocksProtos) + .addAllTokens(tokensProtos) + .build(); + // Send the RPC + GetHdfsBlockLocationsResponseProto response; + try { + response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + // List of volumes in the response + List volumeIdsByteStrings = response.getVolumeIdsList(); + List volumeIds = new ArrayList(volumeIdsByteStrings.size()); + for (ByteString bs : volumeIdsByteStrings) { + volumeIds.add(bs.toByteArray()); + } + // Array of indexes into the list of volumes, one per block + List volumeIndexes = response.getVolumeIndexesList(); + // Parsed HdfsVolumeId values, one per block + return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), + volumeIds, volumeIndexes); + } +} Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012 @@ -50,14 +50,14 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; @@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto; @@ -127,7 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; -import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.io.Text; import com.google.protobuf.RpcController; @@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe } } + + @Override + public RollEditsResponseProto rollEdits(RpcController controller, + RollEditsRequestProto request) throws ServiceException { + try { + long txid = server.rollEdits(); + return RollEditsResponseProto.newBuilder() + .setNewSegmentTxId(txid) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = RefreshNodesResponseProto.newBuilder().build(); @@ -568,24 +584,6 @@ public class ClientNamenodeProtocolServe } @Override - public DistributedUpgradeProgressResponseProto distributedUpgradeProgress( - RpcController controller, DistributedUpgradeProgressRequestProto req) - throws ServiceException { - try { - UpgradeStatusReport result = server.distributedUpgradeProgress(PBHelper - .convert(req.getAction())); - DistributedUpgradeProgressResponseProto.Builder builder = - DistributedUpgradeProgressResponseProto.newBuilder(); - if (result != null) { - builder.setReport(PBHelper.convert(result)); - } - return builder.build(); - } catch (IOException e) { - throw new ServiceException(e); - } - } - - @Override public ListCorruptFileBlocksResponseProto listCorruptFileBlocks( RpcController controller, ListCorruptFileBlocksRequestProto req) throws ServiceException { @@ -830,4 +828,18 @@ public class ClientNamenodeProtocolServe throw new ServiceException(e); } } + + @Override + public GetDataEncryptionKeyResponseProto getDataEncryptionKey( + RpcController controller, GetDataEncryptionKeyRequestProto request) + throws ServiceException { + try { + DataEncryptionKey encryptionKey = server.getDataEncryptionKey(); + return GetDataEncryptionKeyResponseProto.newBuilder() + .setDataEncryptionKey(PBHelper.convert(encryptionKey)) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } }