Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00EA6619F for ; Thu, 9 Jun 2011 18:17:03 +0000 (UTC) Received: (qmail 70197 invoked by uid 500); 9 Jun 2011 18:17:02 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 70150 invoked by uid 500); 9 Jun 2011 18:17:02 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 70142 invoked by uid 99); 9 Jun 2011 18:17:02 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2011 18:17:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2011 18:16:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 591D12388906; Thu, 9 Jun 2011 18:16:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1134023 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs... Date: Thu, 09 Jun 2011 18:16:31 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110609181631.591D12388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Thu Jun 9 18:16:30 2011 New Revision: 1134023 URL: http://svn.apache.org/viewvc?rev=1134023&view=rev Log: HDFS-941. The DFS client should cache and reuse open sockets to datanodes while performing reads. Contributed by bc Wong and Todd Lipcon. Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java Modified: hadoop/hdfs/trunk/CHANGES.txt hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Modified: hadoop/hdfs/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/CHANGES.txt (original) +++ hadoop/hdfs/trunk/CHANGES.txt Thu Jun 9 18:16:30 2011 @@ -503,6 +503,9 @@ Trunk (unreleased changes) HDFS-1826. NameNode should save image to name directories in parallel during upgrade. (Matt Foley via hairong) + HDFS-941. The DFS client should cache and reuse open sockets to datanodes + while performing reads. (bc Wong and Todd Lipcon via todd) + BUG FIXES HDFS-1449. Fix test failures - ExtendedBlock must return Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Thu Jun 9 18:16:30 2011 @@ -46,12 +46,29 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.util.DataChecksum; /** This is a wrapper around connection to datanode - * and understands checksum, offset etc + * and understands checksum, offset etc. + * + * Terminology: + *
+ *
block
+ *
The hdfs block, typically large (~64MB). + *
+ *
chunk
+ *
A block is divided into chunks, each comes with a checksum. + * We want transfers to be chunk-aligned, to be able to + * verify checksums. + *
+ *
packet
+ *
A grouping of chunks used for transport. It contains a + * header, followed by checksum data, followed by real data. + *
+ *
+ * Please see DataNode for the RPC specification. */ @InterfaceAudience.Private public class BlockReader extends FSInputChecker { - Socket dnSock; //for now just sending checksumOk. + Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. private DataInputStream in; private DataChecksum checksum; @@ -77,10 +94,12 @@ public class BlockReader extends FSInput */ private final long bytesNeededToFinish; - private boolean gotEOS = false; + private boolean eos = false; + private boolean sentStatusCode = false; byte[] skipBuf = null; ByteBuffer checksumBytes = null; + /** Amount of unread data in the current received packet */ int dataLeft = 0; /* FSInputChecker interface */ @@ -99,7 +118,7 @@ public class BlockReader extends FSInput // This has to be set here, *before* the skip, since we can // hit EOS during the skip, in the case that our entire read // is smaller than the checksum chunk. - boolean eosBefore = gotEOS; + boolean eosBefore = eos; //for the first read, skip the extra bytes at the front. if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) { @@ -115,11 +134,14 @@ public class BlockReader extends FSInput } int nRead = super.read(buf, off, len); - - // if gotEOS was set in the previous read and checksum is enabled : - if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) { - //checksum is verified and there are no errors. - checksumOk(dnSock); + + // if eos was set in the previous read, send a status code to the DN + if (eos && !eosBefore && nRead >= 0) { + if (needChecksum()) { + sendReadResult(dnSock, CHECKSUM_OK); + } else { + sendReadResult(dnSock, SUCCESS); + } } return nRead; } @@ -191,7 +213,7 @@ public class BlockReader extends FSInput int len, byte[] checksumBuf) throws IOException { // Read one chunk. - if ( gotEOS ) { + if (eos) { // Already hit EOF return -1; } @@ -246,7 +268,7 @@ public class BlockReader extends FSInput if (checksumSize > 0) { - // How many chunks left in our stream - this is a ceiling + // How many chunks left in our packet - this is a ceiling // since we may have a partial chunk at the end of the file int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1; @@ -307,7 +329,7 @@ public class BlockReader extends FSInput ", dataLen : " + dataLen); } - gotEOS = true; + eos = true; } if ( bytesToRead == 0 ) { @@ -335,7 +357,7 @@ public class BlockReader extends FSInput // The total number of bytes that we need to transfer from the DN is // the amount that the user wants (bytesToRead), plus the padding at // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read ends mid-chunk. + // to send more than this amount if the read starts/ends mid-chunk. this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); this.firstChunkOffset = firstChunkOffset; @@ -364,6 +386,21 @@ public class BlockReader extends FSInput len, bufferSize, verifyChecksum, ""); } + /** + * Create a new BlockReader specifically to satisfy a read. + * This method also sends the OP_READ_BLOCK request. + * + * @param sock An established Socket to the DN. The BlockReader will not close it normally + * @param file File location + * @param block The block object + * @param blockToken The block token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance, or null on error. + */ public static BlockReader newBlockReader( Socket sock, String file, ExtendedBlock block, Token blockToken, @@ -423,6 +460,10 @@ public class BlockReader extends FSInput public synchronized void close() throws IOException { startOffset = -1; checksum = null; + if (dnSock != null) { + dnSock.close(); + } + // in will be closed when its Socket is closed. } @@ -432,22 +473,43 @@ public class BlockReader extends FSInput public int readAll(byte[] buf, int offset, int len) throws IOException { return readFully(this, buf, offset, len); } - - /* When the reader reaches end of the read and there are no checksum - * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that - * checksum was verified and there was no error. - */ - void checksumOk(Socket sock) { + + /** + * Take the socket used to talk to the DN. + */ + public Socket takeSocket() { + assert hasSentStatusCode() : + "BlockReader shouldn't give back sockets mid-read"; + Socket res = dnSock; + dnSock = null; + return res; + } + + /** + * Whether the BlockReader has reached the end of its input stream + * and successfully sent a status code back to the datanode. + */ + public boolean hasSentStatusCode() { + return sentStatusCode; + } + + /** + * When the reader reaches end of the read, it sends a status response + * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN + * closing our connection (which we will re-open), but won't affect + * data correctness. + */ + void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) { + assert !sentStatusCode : "already sent status code to " + sock; try { OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT); - CHECKSUM_OK.writeOutputStream(out); + statusCode.writeOutputStream(out); out.flush(); + sentStatusCode = true; } catch (IOException e) { - // its ok not to be able to send this. - if(LOG.isDebugEnabled()) { - LOG.debug("Could not write to datanode " + sock.getInetAddress() + - ": " + e.getMessage()); - } + // It's ok not to be able to send this. But something is probably wrong. + LOG.info("Could not send read status (" + statusCode + ") to datanode " + + sock.getInetAddress() + ": " + e.getMessage()); } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 9 18:16:30 2011 @@ -136,6 +136,8 @@ public class DFSClient implements FSCons final int hdfsTimeout; // timeout value for a DFS operation. final LeaseRenewer leaserenewer; + final SocketCache socketCache; + /** * A map from file names to {@link DFSOutputStream} objects * that are currently being written by this client. @@ -279,6 +281,10 @@ public class DFSClient implements FSCons defaultReplication = (short) conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); + + this.socketCache = new SocketCache( + conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT)); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi); Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Jun 9 18:16:30 2011 @@ -44,6 +44,8 @@ public class DFSConfigKeys extends Commo public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy"; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; + public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; + public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; @@ -83,6 +85,8 @@ public class DFSConfigKeys extends Commo public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml"; public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth"; public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; + public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry"; + public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3; public static final String DFS_NAMENODE_ACCESSTIME_PRECISION_KEY = "dfs.namenode.accesstime.precision"; public static final long DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT = 3600000; public static final String DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY = "dfs.namenode.replication.considerLoad"; @@ -112,6 +116,8 @@ public class DFSConfigKeys extends Commo public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0; public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose"; public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false; + public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive"; + public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000; //Delegation token related keys public static final String DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval"; Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Jun 9 18:16:30 2011 @@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; @@ -56,8 +55,9 @@ import org.apache.hadoop.util.StringUtil ****************************************************************/ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream { + private final SocketCache socketCache; + private final DFSClient dfsClient; - private Socket s = null; private boolean closed = false; private final String src; @@ -92,7 +92,9 @@ public class DFSInputStream extends FSIn private int buffersize = 1; private byte[] oneByteBuf = new byte[1]; // used for 'int read()' - + + private int nCachedConnRetry; + void addToDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } @@ -103,9 +105,14 @@ public class DFSInputStream extends FSIn this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; + this.socketCache = dfsClient.socketCache; prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * dfsClient.defaultBlockSize); - timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow); + timeWindow = this.dfsClient.conf.getInt( + DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow); + nCachedConnRetry = this.dfsClient.conf.getInt( + DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY, + DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); openInfo(); } @@ -371,15 +378,11 @@ public class DFSInputStream extends FSIn throw new IOException("Attempted to read past end of file"); } - if ( blockReader != null ) { - blockReader.close(); + // Will be getting a new BlockReader. + if (blockReader != null) { + closeBlockReader(blockReader); blockReader = null; } - - if (s != null) { - s.close(); - s = null; - } // // Connect to best DataNode for desired Block, with potential offset @@ -400,14 +403,12 @@ public class DFSInputStream extends FSIn InetSocketAddress targetAddr = retval.addr; try { - s = dfsClient.socketFactory.createSocket(); - NetUtils.connect(s, targetAddr, dfsClient.socketTimeout); - s.setSoTimeout(dfsClient.socketTimeout); ExtendedBlock blk = targetBlock.getBlock(); Token accessToken = targetBlock.getBlockToken(); - blockReader = BlockReader.newBlockReader(s, src, blk, - accessToken, + blockReader = getBlockReader( + targetAddr, src, blk, + accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, buffersize, verifyChecksum, dfsClient.clientName); return chosenNode; @@ -437,13 +438,6 @@ public class DFSInputStream extends FSIn // Put chosen node into dead list, continue addToDeadNodes(chosenNode); } - if (s != null) { - try { - s.close(); - } catch (IOException iex) { - } - } - s = null; } } } @@ -457,16 +451,11 @@ public class DFSInputStream extends FSIn return; } dfsClient.checkOpen(); - - if ( blockReader != null ) { - blockReader.close(); + + if (blockReader != null) { + closeBlockReader(blockReader); blockReader = null; } - - if (s != null) { - s.close(); - s = null; - } super.close(); closed = true; } @@ -479,7 +468,7 @@ public class DFSInputStream extends FSIn /* This is a used by regular read() and handles ChecksumExceptions. * name readBuffer() is chosen to imply similarity to readBuffer() in - * ChecksuFileSystem + * ChecksumFileSystem */ private synchronized int readBuffer(byte buf[], int off, int len, Map> corruptedBlockMap) @@ -659,7 +648,6 @@ public class DFSInputStream extends FSIn // // Connect to best DataNode for desired Block, with potential offset // - Socket dn = null; int refetchToken = 1; // only need to get a new access token once while (true) { @@ -673,18 +661,15 @@ public class DFSInputStream extends FSIn BlockReader reader = null; try { - dn = dfsClient.socketFactory.createSocket(); - NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout); - dn.setSoTimeout(dfsClient.socketTimeout); Token blockToken = block.getBlockToken(); int len = (int) (end - start + 1); - - reader = BlockReader.newBlockReader(dn, src, - block.getBlock(), - blockToken, - start, len, buffersize, - verifyChecksum, dfsClient.clientName); + + reader = getBlockReader(targetAddr, src, + block.getBlock(), + blockToken, + start, len, buffersize, + verifyChecksum, dfsClient.clientName); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + @@ -713,8 +698,9 @@ public class DFSInputStream extends FSIn } } } finally { - IOUtils.closeStream(reader); - IOUtils.closeSocket(dn); + if (reader != null) { + closeBlockReader(reader); + } } // Put chosen node into dead list, continue addToDeadNodes(chosenNode); @@ -722,6 +708,95 @@ public class DFSInputStream extends FSIn } /** + * Close the given BlockReader and cache its socket. + */ + private void closeBlockReader(BlockReader reader) throws IOException { + if (reader.hasSentStatusCode()) { + Socket oldSock = reader.takeSocket(); + socketCache.put(oldSock); + } + reader.close(); + } + + /** + * Retrieve a BlockReader suitable for reading. + * This method will reuse the cached connection to the DN if appropriate. + * Otherwise, it will create a new connection. + * + * @param dnAddr Address of the datanode + * @param file File location + * @param block The Block object + * @param blockToken The access token for security + * @param startOffset The read offset, relative to block head + * @param len The number of bytes to read + * @param bufferSize The IO buffer size (not the client buffer size) + * @param verifyChecksum Whether to verify checksum + * @param clientName Client name + * @return New BlockReader instance + */ + protected BlockReader getBlockReader(InetSocketAddress dnAddr, + String file, + ExtendedBlock block, + Token blockToken, + long startOffset, + long len, + int bufferSize, + boolean verifyChecksum, + String clientName) + throws IOException { + IOException err = null; + boolean fromCache = true; + + // Allow retry since there is no way of knowing whether the cached socket + // is good until we actually use it. + for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) { + Socket sock = socketCache.get(dnAddr); + if (sock == null) { + fromCache = false; + + sock = dfsClient.socketFactory.createSocket(); + + // TCP_NODELAY is crucial here because of bad interactions between + // Nagle's Algorithm and Delayed ACKs. With connection keepalive + // between the client and DN, the conversation looks like: + // 1. Client -> DN: Read block X + // 2. DN -> Client: data for block X + // 3. Client -> DN: Status OK (successful read) + // 4. Client -> DN: Read block Y + // The fact that step #3 and #4 are both in the client->DN direction + // triggers Nagling. If the DN is using delayed ACKs, this results + // in a delay of 40ms or more. + // + // TCP_NODELAY disables nagling and thus avoids this performance + // disaster. + sock.setTcpNoDelay(true); + + NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout); + sock.setSoTimeout(dfsClient.socketTimeout); + } + + try { + // The OP_READ_BLOCK request is sent as we make the BlockReader + BlockReader reader = + BlockReader.newBlockReader(sock, file, block, + blockToken, + startOffset, len, + bufferSize, verifyChecksum, + clientName); + return reader; + } catch (IOException ex) { + // Our socket is no good. + DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex); + sock.close(); + err = ex; + } + } + + throw err; + } + + + /** * Read bytes starting from the specified position. * * @param position start read from this position Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1134023&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java (added) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java Thu Jun 9 18:16:30 2011 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.net.Socket; +import java.net.SocketAddress; + +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import com.google.common.base.Preconditions; +import com.google.common.collect.LinkedListMultimap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; + +/** + * A cache of sockets. + */ +class SocketCache { + static final Log LOG = LogFactory.getLog(SocketCache.class); + + private final LinkedListMultimap multimap; + private final int capacity; + + /** + * Create a SocketCache with the given capacity. + * @param capacity Max cache size. + */ + public SocketCache(int capacity) { + multimap = LinkedListMultimap.create(); + this.capacity = capacity; + } + + /** + * Get a cached socket to the given address. + * @param remote Remote address the socket is connected to. + * @return A socket with unknown state, possibly closed underneath. Or null. + */ + public synchronized Socket get(SocketAddress remote) { + List socklist = multimap.get(remote); + if (socklist == null) { + return null; + } + + Iterator iter = socklist.iterator(); + while (iter.hasNext()) { + Socket candidate = iter.next(); + iter.remove(); + if (!candidate.isClosed()) { + return candidate; + } + } + return null; + } + + /** + * Give an unused socket to the cache. + * @param sock socket not used by anyone. + */ + public synchronized void put(Socket sock) { + Preconditions.checkNotNull(sock); + + SocketAddress remoteAddr = sock.getRemoteSocketAddress(); + if (remoteAddr == null) { + LOG.warn("Cannot cache (unconnected) socket with no remote address: " + + sock); + IOUtils.closeSocket(sock); + return; + } + + if (capacity == multimap.size()) { + evictOldest(); + } + multimap.put(remoteAddr, sock); + } + + public synchronized int size() { + return multimap.size(); + } + + /** + * Evict the oldest entry in the cache. + */ + private synchronized void evictOldest() { + Iterator> iter = + multimap.entries().iterator(); + if (!iter.hasNext()) { + throw new IllegalStateException("Cannot evict from empty cache!"); + } + Entry entry = iter.next(); + iter.remove(); + Socket sock = entry.getValue(); + IOUtils.closeSocket(sock); + } + + /** + * Empty the cache, and close all sockets. + */ + public synchronized void clear() { + for (Socket sock : multimap.values()) { + IOUtils.closeSocket(sock); + } + multimap.clear(); + } + + protected void finalize() { + clear(); + } + +} Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu Jun 9 18:16:30 2011 @@ -83,6 +83,7 @@ public interface HdfsConstants { public static int READ_TIMEOUT_EXTENSION = 5 * 1000; public static int WRITE_TIMEOUT = 8 * 60 * 1000; public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline + public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000; /** * Defines the NameNode role. Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Jun 9 18:16:30 2011 @@ -69,7 +69,8 @@ class BlockSender implements java.io.Clo private long seqno; // sequence number of packet private boolean transferToAllowed = true; - private boolean blockReadFully; //set when the whole block is read + // set once entire requested byte range has been sent to the client + private boolean sentEntireByteRange; private boolean verifyChecksum; //if true, check is verified while reading private DataTransferThrottler throttler; private final String clientTraceFmt; // format of client trace log message @@ -493,6 +494,8 @@ class BlockSender implements java.io.Clo } catch (IOException e) { //socket error throw ioeToSocketException(e); } + + sentEntireByteRange = true; } finally { if (clientTraceFmt != null) { final long endTime = System.nanoTime(); @@ -501,12 +504,10 @@ class BlockSender implements java.io.Clo close(); } - blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength; - return totalRead; } - boolean isBlockReadFully() { - return blockReadFully; + boolean didSendEntireByteRange() { + return sentEntireByteRange; } } Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jun 9 18:16:30 2011 @@ -1831,15 +1831,21 @@ public class DataNode extends Configured A "PACKET" is defined further below. The client reads data until it receives a packet with - "LastPacketInBlock" set to true or with a zero length. If there is - no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK: + "LastPacketInBlock" set to true or with a zero length. It then replies + to DataNode with one of the status codes: + - CHECKSUM_OK: All the chunk checksums have been verified + - SUCCESS: Data received; checksums not verified + - ERROR_CHECKSUM: (Currently not used) Detected invalid checksums + + +---------------+ + | 2 byte Status | + +---------------+ - Client optional response at the end of data transmission of any length: - +------------------------------+ - | 2 byte OP_STATUS_CHECKSUM_OK | - +------------------------------+ - The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the - client connection if it is absent. + The DataNode expects all well behaved clients to send the 2 byte + status code. And if the the client doesn't, the DN will close the + connection. So the status code is optional in the sense that it + does not affect the correctness of the data. (And the client can + always reconnect.) PACKET : Contains a packet header, checksum and data. Amount of data ======== carried is set by BUFFER_SIZE. Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Jun 9 18:16:30 2011 @@ -27,14 +27,18 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.nio.channels.ClosedChannelException; import java.util.Arrays; import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -71,6 +75,7 @@ class DataXceiver extends DataTransferPr private final DataNode datanode; private final DataXceiverServer dataXceiverServer; + private int socketKeepaliveTimeout; private long opStartTime; //the start time of receiving an Op public DataXceiver(Socket s, DataNode datanode, @@ -83,6 +88,10 @@ class DataXceiver extends DataTransferPr remoteAddress = s.getRemoteSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString(); + socketKeepaliveTimeout = datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, + DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); + if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); @@ -113,24 +122,60 @@ class DataXceiver extends DataTransferPr updateCurrentThreadName("Waiting for operation"); DataInputStream in=null; + int opsProcessed = 0; try { in = new DataInputStream( new BufferedInputStream(NetUtils.getInputStream(s), SMALL_BUFFER_SIZE)); - final DataTransferProtocol.Op op = readOp(in); + int stdTimeout = s.getSoTimeout(); - // Make sure the xciver count is not exceeded - int curXceiverCount = datanode.getXceiverCount(); - if (curXceiverCount > dataXceiverServer.maxXceiverCount) { - throw new IOException("xceiverCount " + curXceiverCount - + " exceeds the limit of concurrent xcievers " - + dataXceiverServer.maxXceiverCount); - } + // We process requests in a loop, and stay around for a short timeout. + // This optimistic behaviour allows the other end to reuse connections. + // Setting keepalive timeout to 0 disable this behavior. + do { + DataTransferProtocol.Op op; + try { + if (opsProcessed != 0) { + assert socketKeepaliveTimeout > 0; + s.setSoTimeout(socketKeepaliveTimeout); + } + op = readOp(in); + } catch (InterruptedIOException ignored) { + // Time out while we wait for client rpc + break; + } catch (IOException err) { + // Since we optimistically expect the next op, it's quite normal to get EOF here. + if (opsProcessed > 0 && + (err instanceof EOFException || err instanceof ClosedChannelException)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops"); + } + } else { + throw err; + } + break; + } + + // restore normal timeout + if (opsProcessed != 0) { + s.setSoTimeout(stdTimeout); + } - opStartTime = now(); - processOp(op, in); + // Make sure the xceiver count is not exceeded + int curXceiverCount = datanode.getXceiverCount(); + if (curXceiverCount > dataXceiverServer.maxXceiverCount) { + throw new IOException("xceiverCount " + curXceiverCount + + " exceeds the limit of concurrent xcievers " + + dataXceiverServer.maxXceiverCount); + } + + opStartTime = now(); + processOp(op, in); + ++opsProcessed; + } while (s.isConnected() && socketKeepaliveTimeout > 0); } catch (Throwable t) { - LOG.error(datanode.getMachineName() + ":DataXceiver",t); + LOG.error(datanode.getMachineName() + ":DataXceiver, at " + + s.toString(), t); } finally { if (LOG.isDebugEnabled()) { LOG.debug(datanode.getMachineName() + ":Number of active connections is: " @@ -176,18 +221,36 @@ class DataXceiver extends DataTransferPr blockSender = new BlockSender(block, startOffset, length, true, true, false, datanode, clientTraceFmt); } catch(IOException e) { - ERROR.write(out); + sendResponse(s, ERROR, datanode.socketWriteTimeout); throw e; } SUCCESS.write(out); // send op status long read = blockSender.sendBlock(out, baseStream, null); // send data - + + if (blockSender.didSendEntireByteRange()) { + // If we sent the entire range, then we should expect the client + // to respond with a Status enum. + try { + DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in); + if (stat == null) { + LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + + "code after reading. Will close connection."); + IOUtils.closeStream(out); + } + } catch (IOException ioe) { + LOG.debug("Error reading client status response. Will close connection.", ioe); + IOUtils.closeStream(out); + } + } else { + IOUtils.closeStream(out); + } datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBlocksRead(); } catch ( SocketException ignored ) { // Its ok for remote side to close the connection anytime. datanode.metrics.incrBlocksRead(); + IOUtils.closeStream(out); } catch ( IOException ioe ) { /* What exactly should we do here? * Earlier version shutdown() datanode if there is disk error. @@ -198,7 +261,6 @@ class DataXceiver extends DataTransferPr StringUtils.stringifyException(ioe) ); throw ioe; } finally { - IOUtils.closeStream(out); IOUtils.closeStream(blockSender); } @@ -690,12 +752,8 @@ class DataXceiver extends DataTransferPr long timeout) throws IOException { DataOutputStream reply = new DataOutputStream(NetUtils.getOutputStream(s, timeout)); - try { - opStatus.write(reply); - reply.flush(); - } finally { - IOUtils.closeStream(reply); - } + opStatus.write(reply); + reply.flush(); } private void checkAccess(DataOutputStream out, final boolean reply, Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Thu Jun 9 18:16:30 2011 @@ -73,28 +73,39 @@ public class BlockReaderTestUtil { /** * Create a file of the given size filled with random data. - * @return List of Blocks of the new file. + * @return File data. */ - public List writeFile(Path filepath, int sizeKB) + public byte[] writeFile(Path filepath, int sizeKB) throws IOException { FileSystem fs = cluster.getFileSystem(); // Write a file with the specified amount of data DataOutputStream os = fs.create(filepath); - byte data[] = new byte[1024]; + byte data[] = new byte[1024 * sizeKB]; new Random().nextBytes(data); - for (int i = 0; i < sizeKB; i++) { - os.write(data); - } + os.write(data); os.close(); + return data; + } + /** + * Get the list of Blocks for a file. + */ + public List getFileBlocks(Path filepath, int sizeKB) + throws IOException { // Return the blocks we just wrote - DFSClient dfsclient = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); + DFSClient dfsclient = getDFSClient(); return dfsclient.getNamenode().getBlockLocations( filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks(); } + /** + * Get the DFSClient. + */ + public DFSClient getDFSClient() throws IOException { + InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); + return new DFSClient(nnAddr, conf); + } /** * Exercise the BlockReader and read length bytes. Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Thu Jun 9 18:16:30 2011 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.util.List; +import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.fs.Path; @@ -41,23 +42,24 @@ public class TestClientBlockVerification public static void setupCluster() throws Exception { final int REPLICATION_FACTOR = 1; util = new BlockReaderTestUtil(REPLICATION_FACTOR); - List blkList = util.writeFile(TEST_FILE, FILE_SIZE_K); + util.writeFile(TEST_FILE, FILE_SIZE_K); + List blkList = util.getFileBlocks(TEST_FILE, FILE_SIZE_K); testBlock = blkList.get(0); // Use the first block to test } /** - * Verify that if we read an entire block, we send checksumOk + * Verify that if we read an entire block, we send CHECKSUM_OK */ @Test public void testBlockVerification() throws Exception { BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); - verify(reader).checksumOk(reader.dnSock); + verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); } /** - * Test that if we do an incomplete read, we don't call checksumOk + * Test that if we do an incomplete read, we don't call CHECKSUM_OK */ @Test public void testIncompleteRead() throws Exception { @@ -65,14 +67,14 @@ public class TestClientBlockVerification util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false); // We asked the blockreader for the whole file, and only read - // half of it, so no checksumOk - verify(reader, never()).checksumOk(reader.dnSock); + // half of it, so no CHECKSUM_OK + verify(reader, never()).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); } /** * Test that if we ask for a half block, and read it all, we *do* - * call checksumOk. The DN takes care of knowing whether it was + * send CHECKSUM_OK. The DN takes care of knowing whether it was * the whole block or not. */ @Test @@ -81,7 +83,7 @@ public class TestClientBlockVerification BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2)); // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); - verify(reader).checksumOk(reader.dnSock); + verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); } @@ -99,7 +101,7 @@ public class TestClientBlockVerification " len=" + length); BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); - verify(reader).checksumOk(reader.dnSock); + verify(reader).sendReadResult(reader.dnSock, Status.CHECKSUM_OK); reader.close(); } } Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java?rev=1134023&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java Thu Jun 9 18:16:30 2011 @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.datanode.DataNode; + +import org.apache.hadoop.security.token.Token; +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import static org.junit.Assert.*; + +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.mockito.invocation.InvocationOnMock; +import static org.mockito.Mockito.spy; + +/** + * This class tests the client connection caching in a single node + * mini-cluster. + */ +public class TestConnCache { + static final Log LOG = LogFactory.getLog(TestConnCache.class); + + static final int BLOCK_SIZE = 4096; + static final int FILE_SIZE = 3 * BLOCK_SIZE; + + static Configuration conf = null; + static MiniDFSCluster cluster = null; + static FileSystem fs = null; + + static final Path testFile = new Path("/testConnCache.dat"); + static byte authenticData[] = null; + + static BlockReaderTestUtil util = null; + + + /** + * A mock Answer to remember the BlockReader used. + * + * It verifies that all invocation to DFSInputStream.getBlockReader() + * use the same socket. + */ + private class MockGetBlockReader implements Answer { + public BlockReader reader = null; + private Socket sock = null; + + public BlockReader answer(InvocationOnMock invocation) throws Throwable { + BlockReader prevReader = reader; + reader = (BlockReader) invocation.callRealMethod(); + if (sock == null) { + sock = reader.dnSock; + } else if (prevReader != null && prevReader.hasSentStatusCode()) { + // Can't reuse socket if the previous BlockReader didn't read till EOS. + assertSame("DFSInputStream should use the same socket", + sock, reader.dnSock); + } return reader; + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + final int REPLICATION_FACTOR = 1; + + util = new BlockReaderTestUtil(REPLICATION_FACTOR); + cluster = util.getCluster(); + conf = util.getConf(); + fs = cluster.getFileSystem(); + + authenticData = util.writeFile(testFile, FILE_SIZE / 1024); + } + + + /** + * (Optionally) seek to position, read and verify data. + * + * Seek to specified position if pos is non-negative. + */ + private void pread(DFSInputStream in, + long pos, + byte[] buffer, + int offset, + int length) + throws IOException { + assertTrue("Test buffer too small", buffer.length >= offset + length); + + if (pos >= 0) + in.seek(pos); + + LOG.info("Reading from file of size " + in.getFileLength() + + " at offset " + in.getPos()); + + while (length > 0) { + int cnt = in.read(buffer, offset, length); + assertTrue("Error in read", cnt > 0); + offset += cnt; + length -= cnt; + } + + // Verify + for (int i = 0; i < length; ++i) { + byte actual = buffer[i]; + byte expect = authenticData[(int)pos + i]; + assertEquals("Read data mismatch at file offset " + (pos + i) + + ". Expects " + expect + "; got " + actual, + actual, expect); + } + } + + /** + * Test the SocketCache itself. + */ + @Test + public void testSocketCache() throws IOException { + final int CACHE_SIZE = 4; + SocketCache cache = new SocketCache(CACHE_SIZE); + + // Make a client + InetSocketAddress nnAddr = + new InetSocketAddress("localhost", cluster.getNameNodePort()); + DFSClient client = new DFSClient(nnAddr, conf); + + // Find out the DN addr + LocatedBlock block = + client.getNamenode().getBlockLocations( + testFile.toString(), 0, FILE_SIZE) + .getLocatedBlocks().get(0); + DataNode dn = util.getDataNode(block); + InetSocketAddress dnAddr = dn.getSelfAddr(); + + // Make some sockets to the DN + Socket[] dnSockets = new Socket[CACHE_SIZE]; + for (int i = 0; i < dnSockets.length; ++i) { + dnSockets[i] = client.socketFactory.createSocket( + dnAddr.getAddress(), dnAddr.getPort()); + } + + // Insert a socket to the NN + Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); + cache.put(nnSock); + assertSame("Read the write", nnSock, cache.get(nnAddr)); + cache.put(nnSock); + + // Insert DN socks + for (Socket dnSock : dnSockets) { + cache.put(dnSock); + } + + assertEquals("NN socket evicted", null, cache.get(nnAddr)); + assertTrue("Evicted socket closed", nnSock.isClosed()); + + // Lookup the DN socks + for (Socket dnSock : dnSockets) { + assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr)); + dnSock.close(); + } + + assertEquals("Cache is empty", 0, cache.size()); + } + + /** + * Read a file served entirely from one DN. Seek around and read from + * different offsets. And verify that they all use the same socket. + * + * @throws java.io.IOException + */ + @Test + @SuppressWarnings("unchecked") + public void testReadFromOneDN() throws IOException { + LOG.info("Starting testReadFromOneDN()"); + DFSClient client = new DFSClient( + new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); + DFSInputStream in = spy(client.open(testFile.toString())); + LOG.info("opened " + testFile.toString()); + + byte[] dataBuf = new byte[BLOCK_SIZE]; + + MockGetBlockReader answer = new MockGetBlockReader(); + Mockito.doAnswer(answer).when(in).getBlockReader( + (InetSocketAddress) Matchers.anyObject(), + Matchers.anyString(), + (ExtendedBlock) Matchers.anyObject(), + (Token) Matchers.anyObject(), + Matchers.anyLong(), + Matchers.anyLong(), + Matchers.anyInt(), + Matchers.anyBoolean(), + Matchers.anyString()); + + // Initial read + pread(in, 0, dataBuf, 0, dataBuf.length); + // Read again and verify that the socket is the same + pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length); + pread(in, 1024, dataBuf, 0, dataBuf.length); + pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read + pread(in, 64, dataBuf, 0, dataBuf.length / 2); + + in.close(); + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdown(); + } +} Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Thu Jun 9 18:16:30 2011 @@ -384,6 +384,8 @@ public class TestDFSClientRetries extend conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, retries); conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWin); + // Disable keepalive + conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, 0); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(replicationFactor).build(); cluster.waitActive(); Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java?rev=1134023&view=auto ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java (added) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java Thu Jun 9 18:16:30 2011 @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; + +import org.junit.Test; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import static org.junit.Assert.*; + +/** + * Test the use of DFSInputStream by multiple concurrent readers. + */ +public class TestParallelRead { + + static final Log LOG = LogFactory.getLog(TestParallelRead.class); + static BlockReaderTestUtil util = null; + static DFSClient dfsClient = null; + static final int FILE_SIZE_K = 256; + static Random rand = null; + + static { + // The client-trace log ends up causing a lot of blocking threads + // in this when it's being used as a performance benchmark. + LogManager.getLogger(DataNode.class.getName() + ".clienttrace") + .setLevel(Level.WARN); + } + + private class TestFileInfo { + public DFSInputStream dis; + public Path filepath; + public byte[] authenticData; + } + + @BeforeClass + public static void setupCluster() throws Exception { + final int REPLICATION_FACTOR = 2; + util = new BlockReaderTestUtil(REPLICATION_FACTOR); + dfsClient = util.getDFSClient(); + rand = new Random(System.currentTimeMillis()); + } + + /** + * A worker to do one "unit" of read. + */ + static class ReadWorker extends Thread { + static public final int N_ITERATIONS = 1024; + + private static final double PROPORTION_NON_POSITIONAL_READ = 0.10; + + private TestFileInfo testInfo; + private long fileSize; + private long bytesRead; + private boolean error; + + ReadWorker(TestFileInfo testInfo, int id) { + super("ReadWorker-" + id + "-" + testInfo.filepath.toString()); + this.testInfo = testInfo; + fileSize = testInfo.dis.getFileLength(); + assertEquals(fileSize, testInfo.authenticData.length); + bytesRead = 0; + error = false; + } + + /** + * Randomly do one of (1) Small read; and (2) Large Pread. + */ + @Override + public void run() { + for (int i = 0; i < N_ITERATIONS; ++i) { + int startOff = rand.nextInt((int) fileSize); + int len = 0; + try { + double p = rand.nextDouble(); + if (p < PROPORTION_NON_POSITIONAL_READ) { + // Do a small regular read. Very likely this will leave unread + // data on the socket and make the socket uncacheable. + len = Math.min(rand.nextInt(64), (int) fileSize - startOff); + read(startOff, len); + bytesRead += len; + } else { + // Do a positional read most of the time. + len = rand.nextInt((int) (fileSize - startOff)); + pRead(startOff, len); + bytesRead += len; + } + } catch (Exception ex) { + LOG.error(getName() + ": Error while testing read at " + startOff + + " length " + len); + error = true; + fail(ex.getMessage()); + } + } + } + + public long getBytesRead() { + return bytesRead; + } + + /** + * Raising error in a thread doesn't seem to fail the test. + * So check afterwards. + */ + public boolean hasError() { + return error; + } + + /** + * Seek to somewhere random and read. + */ + private void read(int start, int len) throws Exception { + assertTrue( + "Bad args: " + start + " + " + len + " should be < " + fileSize, + start + len < fileSize); + DFSInputStream dis = testInfo.dis; + + synchronized (dis) { + dis.seek(start); + + byte buf[] = new byte[len]; + int cnt = 0; + while (cnt < len) { + cnt += dis.read(buf, cnt, buf.length - cnt); + } + verifyData("Read data corrupted", buf, start, start + len); + } + } + + /** + * Positional read. + */ + private void pRead(int start, int len) throws Exception { + assertTrue( + "Bad args: " + start + " + " + len + " should be < " + fileSize, + start + len < fileSize); + DFSInputStream dis = testInfo.dis; + + byte buf[] = new byte[len]; + int cnt = 0; + while (cnt < len) { + cnt += dis.read(start, buf, cnt, buf.length - cnt); + } + verifyData("Pread data corrupted", buf, start, start + len); + } + + /** + * Verify read data vs authentic data + */ + private void verifyData(String msg, byte actual[], int start, int end) + throws Exception { + byte auth[] = testInfo.authenticData; + if (end > auth.length) { + throw new Exception(msg + ": Actual array (" + end + + ") is past the end of authentic data (" + + auth.length + ")"); + } + + int j = start; + for (int i = 0; i < actual.length; ++i, ++j) { + if (auth[j] != actual[i]) { + throw new Exception(msg + ": Arrays byte " + i + " (at offset " + + j + ") differs: expect " + + auth[j] + " got " + actual[i]); + } + } + } + } + + /** + * Do parallel read several times with different number of files and threads. + * + * Note that while this is the only "test" in a junit sense, we're actually + * dispatching a lot more. Failures in the other methods (and other threads) + * need to be manually collected, which is inconvenient. + */ + @Test + public void testParallelRead() throws IOException { + if (!runParallelRead(1, 4)) { + fail("Check log for errors"); + } + if (!runParallelRead(1, 16)) { + fail("Check log for errors"); + } + if (!runParallelRead(2, 4)) { + fail("Check log for errors"); + } + } + + /** + * Start the parallel read with the given parameters. + */ + boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException { + ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach]; + TestFileInfo testInfoArr[] = new TestFileInfo[nFiles]; + + // Prepare the files and workers + int nWorkers = 0; + for (int i = 0; i < nFiles; ++i) { + TestFileInfo testInfo = new TestFileInfo(); + testInfoArr[i] = testInfo; + + testInfo.filepath = new Path("/TestParallelRead.dat." + i); + testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); + testInfo.dis = dfsClient.open(testInfo.filepath.toString()); + + for (int j = 0; j < nWorkerEach; ++j) { + workers[nWorkers++] = new ReadWorker(testInfo, nWorkers); + } + } + + // Start the workers and wait + long starttime = System.currentTimeMillis(); + for (ReadWorker worker : workers) { + worker.start(); + } + + for (ReadWorker worker : workers) { + try { + worker.join(); + } catch (InterruptedException ignored) { } + } + long endtime = System.currentTimeMillis(); + + // Cleanup + for (TestFileInfo testInfo : testInfoArr) { + testInfo.dis.close(); + } + + // Report + boolean res = true; + long totalRead = 0; + for (ReadWorker worker : workers) { + long nread = worker.getBytesRead(); + LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " + + "average " + nread / ReadWorker.N_ITERATIONS + " B per read"); + totalRead += nread; + if (worker.hasError()) { + res = false; + } + } + + double timeTakenSec = (endtime - starttime) / 1000.0; + long totalReadKB = totalRead / 1024; + LOG.info("=== Report: " + nWorkers + " threads read " + + totalReadKB + " KB (across " + + nFiles + " file(s)) in " + + timeTakenSec + "s; average " + + totalReadKB / timeTakenSec + " KB/s"); + + return res; + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdown(); + } + +} Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java?rev=1134023&r1=1134022&r2=1134023&view=diff ============================================================================== --- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java (original) +++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Thu Jun 9 18:16:30 2011 @@ -23,6 +23,7 @@ import java.util.Random; import junit.framework.TestCase; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,6 +38,11 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.MetricsAsserts; +import org.apache.log4j.Level; + +import org.apache.commons.logging.LogFactory; + import static org.apache.hadoop.test.MetricsAsserts.*; /** @@ -59,6 +65,9 @@ public class TestNameNodeMetrics extends DFS_REPLICATION_INTERVAL); CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, DFS_REPLICATION_INTERVAL); + + ((Log4JLogger)LogFactory.getLog(MetricsAsserts.class)) + .getLogger().setLevel(Level.DEBUG); } private MiniDFSCluster cluster; @@ -255,9 +264,5 @@ public class TestNameNodeMetrics extends readFile(fs, file1_Path); updateMetrics(); assertCounter("GetBlockLocations", 3L, getMetrics(NN_METRICS)); - - // Verify total load metrics, total load = Data Node started. - updateMetrics(); - assertGauge("TotalLoad" ,DATANODE_COUNT, getMetrics(NS_METRICS)); } }