Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 51433 invoked from network); 14 Aug 2008 17:59:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Aug 2008 17:59:00 -0000 Received: (qmail 40532 invoked by uid 500); 14 Aug 2008 17:58:59 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 40383 invoked by uid 500); 14 Aug 2008 17:58:58 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 40374 invoked by uid 99); 14 Aug 2008 17:58:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2008 10:58:58 -0700 X-ASF-Spam-Status: No, hits=-1999.3 required=10.0 tests=ALL_TRUSTED,FRT_LEVITRA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Aug 2008 17:58:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BBC8F2388A05; Thu, 14 Aug 2008 10:58:31 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r685979 [3/3] - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/datanode/ Date: Thu, 14 Aug 2008 17:58:31 -0000 To: core-commits@hadoop.apache.org From: johan@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080814175831.BBC8F2388A05@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=685979&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Thu Aug 14 10:58:30 2008 @@ -0,0 +1,571 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * Thread for processing incoming/outgoing data stream. + */ +class DataXceiver implements Runnable, FSConstants { + public static final Log LOG = DataNode.LOG; + + Socket s; + String remoteAddress; // address of remote side + String localAddress; // local address of this daemon + DataNode datanode; + DataXceiverServer dataXceiverServer; + + public DataXceiver(Socket s, DataNode datanode, + DataXceiverServer dataXceiverServer) { + + this.s = s; + this.datanode = datanode; + this.dataXceiverServer = dataXceiverServer; + InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress(); + remoteAddress = isock.toString(); + localAddress = s.getInetAddress() + ":" + s.getLocalPort(); + LOG.debug("Number of active connections is: " + datanode.getXceiverCount()); + } + + /** + * Read/write data from/to the DataXceiveServer. + */ + public void run() { + DataInputStream in=null; + try { + in = new DataInputStream( + new BufferedInputStream(NetUtils.getInputStream(s), + SMALL_BUFFER_SIZE)); + short version = in.readShort(); + if ( version != DATA_TRANSFER_VERSION ) { + throw new IOException( "Version Mismatch" ); + } + boolean local = s.getInetAddress().equals(s.getLocalAddress()); + byte op = in.readByte(); + // Make sure the xciver count is not exceeded + int curXceiverCount = datanode.getXceiverCount(); + if (curXceiverCount > dataXceiverServer.maxXceiverCount) { + throw new IOException("xceiverCount " + curXceiverCount + + " exceeds the limit of concurrent xcievers " + + dataXceiverServer.maxXceiverCount); + } + long startTime = DataNode.now(); + switch ( op ) { + case OP_READ_BLOCK: + readBlock( in ); + datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime); + if (local) + datanode.myMetrics.readsFromLocalClient.inc(); + else + datanode.myMetrics.readsFromRemoteClient.inc(); + break; + case OP_WRITE_BLOCK: + writeBlock( in ); + datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime); + if (local) + datanode.myMetrics.writesFromLocalClient.inc(); + else + datanode.myMetrics.writesFromRemoteClient.inc(); + break; + case OP_READ_METADATA: + readMetadata( in ); + datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime); + break; + case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination + replaceBlock(in); + datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime); + break; + case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source + copyBlock(in); + datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime); + break; + default: + throw new IOException("Unknown opcode " + op + " in data stream"); + } + } catch (Throwable t) { + LOG.error(datanode.dnRegistration + ":DataXceiver",t); + } finally { + LOG.debug(datanode.dnRegistration + ":Number of active connections is: " + + datanode.getXceiverCount()); + IOUtils.closeStream(in); + IOUtils.closeSocket(s); + dataXceiverServer.childSockets.remove(s); + } + } + + /** + * Read a block from the disk. + * @param in The stream to read from + * @throws IOException + */ + private void readBlock(DataInputStream in) throws IOException { + // + // Read in the header + // + long blockId = in.readLong(); + Block block = new Block( blockId, 0 , in.readLong()); + + long startOffset = in.readLong(); + long length = in.readLong(); + + // send the block + OutputStream baseStream = NetUtils.getOutputStream(s, + datanode.socketWriteTimeout); + DataOutputStream out = new DataOutputStream( + new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); + + BlockSender blockSender = null; + try { + try { + blockSender = new BlockSender(block, startOffset, length, + true, true, false, datanode); + } catch(IOException e) { + out.writeShort(OP_STATUS_ERROR); + throw e; + } + + out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status + long read = blockSender.sendBlock(out, baseStream, null); // send data + + if (blockSender.isBlockReadFully()) { + // See if client verification succeeded. + // This is an optional response from client. + try { + if (in.readShort() == OP_STATUS_CHECKSUM_OK && + datanode.blockScanner != null) { + datanode.blockScanner.verifiedByClient(block); + } + } catch (IOException ignored) {} + } + + datanode.myMetrics.bytesRead.inc((int) read); + datanode.myMetrics.blocksRead.inc(); + LOG.info(datanode.dnRegistration + " Served block " + block + " to " + + s.getInetAddress()); + } catch ( SocketException ignored ) { + // Its ok for remote side to close the connection anytime. + datanode.myMetrics.blocksRead.inc(); + } catch ( IOException ioe ) { + /* What exactly should we do here? + * Earlier version shutdown() datanode if there is disk error. + */ + LOG.warn(datanode.dnRegistration + ":Got exception while serving " + + block + " to " + + s.getInetAddress() + ":\n" + + StringUtils.stringifyException(ioe) ); + throw ioe; + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(blockSender); + } + } + + /** + * Write a block to disk. + * + * @param in The stream to read from + * @throws IOException + */ + private void writeBlock(DataInputStream in) throws IOException { + DatanodeInfo srcDataNode = null; + LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + + " tcp no delay " + s.getTcpNoDelay()); + // + // Read in the header + // + Block block = new Block(in.readLong(), + dataXceiverServer.estimateBlockSize, in.readLong()); + LOG.info("Receiving block " + block + + " src: " + remoteAddress + + " dest: " + localAddress); + int pipelineSize = in.readInt(); // num of datanodes in entire pipeline + boolean isRecovery = in.readBoolean(); // is this part of recovery? + String client = Text.readString(in); // working on behalf of this client + boolean hasSrcDataNode = in.readBoolean(); // is src node info present + if (hasSrcDataNode) { + srcDataNode = new DatanodeInfo(); + srcDataNode.readFields(in); + } + int numTargets = in.readInt(); + if (numTargets < 0) { + throw new IOException("Mislabelled incoming datastream."); + } + DatanodeInfo targets[] = new DatanodeInfo[numTargets]; + for (int i = 0; i < targets.length; i++) { + DatanodeInfo tmp = new DatanodeInfo(); + tmp.readFields(in); + targets[i] = tmp; + } + + DataOutputStream mirrorOut = null; // stream to next target + DataInputStream mirrorIn = null; // reply from next target + DataOutputStream replyOut = null; // stream to prev target + Socket mirrorSock = null; // socket to next target + BlockReceiver blockReceiver = null; // responsible for data handling + String mirrorNode = null; // the name:port of next target + String firstBadLink = ""; // first datanode that failed in connection setup + try { + // open a block receiver and check if the block does not exist + blockReceiver = new BlockReceiver(block, in, + s.getInetAddress().toString(), isRecovery, client, srcDataNode, + datanode); + + // get a connection back to the previous target + replyOut = new DataOutputStream( + NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + + // + // Open network conn to backup machine, if + // appropriate + // + if (targets.length > 0) { + InetSocketAddress mirrorTarget = null; + // Connect to backup machine + mirrorNode = targets[0].getName(); + mirrorTarget = NetUtils.createSocketAddr(mirrorNode); + mirrorSock = datanode.newSocket(); + try { + int timeoutValue = numTargets * datanode.socketTimeout; + int writeTimeout = datanode.socketWriteTimeout + + (WRITE_TIMEOUT_EXTENSION * numTargets); + mirrorSock.connect(mirrorTarget, timeoutValue); + mirrorSock.setSoTimeout(timeoutValue); + mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); + mirrorOut = new DataOutputStream( + new BufferedOutputStream( + NetUtils.getOutputStream(mirrorSock, writeTimeout), + SMALL_BUFFER_SIZE)); + mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); + + // Write header: Copied from DFSClient.java! + mirrorOut.writeShort( DATA_TRANSFER_VERSION ); + mirrorOut.write( OP_WRITE_BLOCK ); + mirrorOut.writeLong( block.getBlockId() ); + mirrorOut.writeLong( block.getGenerationStamp() ); + mirrorOut.writeInt( pipelineSize ); + mirrorOut.writeBoolean( isRecovery ); + Text.writeString( mirrorOut, client ); + mirrorOut.writeBoolean(hasSrcDataNode); + if (hasSrcDataNode) { // pass src node information + srcDataNode.write(mirrorOut); + } + mirrorOut.writeInt( targets.length - 1 ); + for ( int i = 1; i < targets.length; i++ ) { + targets[i].write( mirrorOut ); + } + + blockReceiver.writeChecksumHeader(mirrorOut); + mirrorOut.flush(); + + // read connect ack (only for clients, not for replication req) + if (client.length() != 0) { + firstBadLink = Text.readString(mirrorIn); + if (LOG.isDebugEnabled() || firstBadLink.length() > 0) { + LOG.info("Datanode " + targets.length + + " got response for connect ack " + + " from downstream datanode with firstbadlink as " + + firstBadLink); + } + } + + } catch (IOException e) { + if (client.length() != 0) { + Text.writeString(replyOut, mirrorNode); + replyOut.flush(); + } + IOUtils.closeStream(mirrorOut); + mirrorOut = null; + IOUtils.closeStream(mirrorIn); + mirrorIn = null; + IOUtils.closeSocket(mirrorSock); + mirrorSock = null; + if (client.length() > 0) { + throw e; + } else { + LOG.info(datanode.dnRegistration + ":Exception transfering block " + + block + " to mirror " + mirrorNode + + ". continuing without the mirror.\n" + + StringUtils.stringifyException(e)); + } + } + } + + // send connect ack back to source (only for clients) + if (client.length() != 0) { + if (LOG.isDebugEnabled() || firstBadLink.length() > 0) { + LOG.info("Datanode " + targets.length + + " forwarding connect ack to upstream firstbadlink is " + + firstBadLink); + } + Text.writeString(replyOut, firstBadLink); + replyOut.flush(); + } + + // receive the block and mirror to the next target + String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; + blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, + mirrorAddr, null, targets.length); + + // if this write is for a replication request (and not + // from a client), then confirm block. For client-writes, + // the block is finalized in the PacketResponder. + if (client.length() == 0) { + datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT); + LOG.info("Received block " + block + + " src: " + remoteAddress + + " dest: " + localAddress + + " of size " + block.getNumBytes()); + } + + if (datanode.blockScanner != null) { + datanode.blockScanner.addBlock(block); + } + + } catch (IOException ioe) { + LOG.info("writeBlock " + block + " received exception " + ioe); + throw ioe; + } finally { + // close all opened streams + IOUtils.closeStream(mirrorOut); + IOUtils.closeStream(mirrorIn); + IOUtils.closeStream(replyOut); + IOUtils.closeSocket(mirrorSock); + IOUtils.closeStream(blockReceiver); + } + } + + /** + * Reads the metadata and sends the data in one 'DATA_CHUNK'. + * @param in + */ + void readMetadata(DataInputStream in) throws IOException { + Block block = new Block( in.readLong(), 0 , in.readLong()); + MetaDataInputStream checksumIn = null; + DataOutputStream out = null; + + try { + + checksumIn = datanode.data.getMetaDataInputStream(block); + + long fileSize = checksumIn.getLength(); + + if (fileSize >= 1L<<31 || fileSize <= 0) { + throw new IOException("Unexpected size for checksumFile of block" + + block); + } + + byte [] buf = new byte[(int)fileSize]; + IOUtils.readFully(checksumIn, buf, 0, buf.length); + + out = new DataOutputStream( + NetUtils.getOutputStream(s, datanode.socketWriteTimeout)); + + out.writeByte(OP_STATUS_SUCCESS); + out.writeInt(buf.length); + out.write(buf); + + //last DATA_CHUNK + out.writeInt(0); + } finally { + IOUtils.closeStream(out); + IOUtils.closeStream(checksumIn); + } + } + + /** + * Read a block from the disk and then sends it to a destination. + * + * @param in The stream to read from + * @throws IOException + */ + private void copyBlock(DataInputStream in) throws IOException { + // Read in the header + long blockId = in.readLong(); // read block id + Block block = new Block(blockId, 0, in.readLong()); + String source = Text.readString(in); // read del hint + DatanodeInfo target = new DatanodeInfo(); // read target + target.readFields(in); + + Socket targetSock = null; + short opStatus = OP_STATUS_SUCCESS; + BlockSender blockSender = null; + DataOutputStream targetOut = null; + try { + datanode.balancingSem.acquireUninterruptibly(); + + // check if the block exists or not + blockSender = new BlockSender(block, 0, -1, false, false, false, + datanode); + + // get the output stream to the target + InetSocketAddress targetAddr = NetUtils.createSocketAddr( + target.getName()); + targetSock = datanode.newSocket(); + targetSock.connect(targetAddr, datanode.socketTimeout); + targetSock.setSoTimeout(datanode.socketTimeout); + + OutputStream baseStream = NetUtils.getOutputStream(targetSock, + datanode.socketWriteTimeout); + targetOut = new DataOutputStream( + new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE)); + + /* send request to the target */ + // fist write header info + targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version + targetOut.writeByte(OP_REPLACE_BLOCK); // op code + targetOut.writeLong(block.getBlockId()); // block id + targetOut.writeLong(block.getGenerationStamp()); // block id + Text.writeString( targetOut, source); // del hint + + // then send data + long read = blockSender.sendBlock(targetOut, baseStream, + datanode.balancingThrottler); + + datanode.myMetrics.bytesRead.inc((int) read); + datanode.myMetrics.blocksRead.inc(); + + // check the response from target + receiveResponse(targetSock, 1); + + LOG.info("Copied block " + block + " to " + targetAddr); + } catch (IOException ioe) { + opStatus = OP_STATUS_ERROR; + LOG.warn("Got exception while serving " + block + " to " + + target.getName() + ": " + StringUtils.stringifyException(ioe)); + throw ioe; + } finally { + /* send response to the requester */ + try { + sendResponse(s, opStatus, datanode.socketWriteTimeout); + } catch (IOException replyE) { + LOG.warn("Error writing the response back to "+ + s.getRemoteSocketAddress() + "\n" + + StringUtils.stringifyException(replyE) ); + } + IOUtils.closeStream(targetOut); + IOUtils.closeStream(blockSender); + datanode.balancingSem.release(); + } + } + + /** + * Receive a block and write it to disk, it then notifies the namenode to + * remove the copy from the source. + * + * @param in The stream to read from + * @throws IOException + */ + private void replaceBlock(DataInputStream in) throws IOException { + datanode.balancingSem.acquireUninterruptibly(); + + /* read header */ + Block block = new Block(in.readLong(), dataXceiverServer.estimateBlockSize, + in.readLong()); // block id & len + String sourceID = Text.readString(in); + + short opStatus = OP_STATUS_SUCCESS; + BlockReceiver blockReceiver = null; + try { + // open a block receiver and check if the block does not exist + blockReceiver = new BlockReceiver( + block, in, s.getRemoteSocketAddress().toString(), false, "", null, + datanode); + + // receive a block + blockReceiver.receiveBlock(null, null, null, null, + datanode.balancingThrottler, -1); + + // notify name node + datanode.notifyNamenodeReceivedBlock(block, sourceID); + + LOG.info("Moved block " + block + + " from " + s.getRemoteSocketAddress()); + } catch (IOException ioe) { + opStatus = OP_STATUS_ERROR; + throw ioe; + } finally { + // send response back + try { + sendResponse(s, opStatus, datanode.socketWriteTimeout); + } catch (IOException ioe) { + LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); + } + IOUtils.closeStream(blockReceiver); + datanode.balancingSem.release(); + } + } + + /** + * Utility function for receiving a response. + * @param s socket to read from + * @param numTargets number of responses to read + **/ + private void receiveResponse(Socket s, int numTargets) throws IOException { + // check the response + DataInputStream reply = new DataInputStream(new BufferedInputStream( + NetUtils.getInputStream(s), BUFFER_SIZE)); + try { + for (int i = 0; i < numTargets; i++) { + short opStatus = reply.readShort(); + if(opStatus != OP_STATUS_SUCCESS) { + throw new IOException("operation failed at "+ + s.getInetAddress()); + } + } + } finally { + IOUtils.closeStream(reply); + } + } + + /** + * Utility function for sending a response. + * @param s socket to write to + * @param opStatus status message to write + * @param timeout send timeout + **/ + private void sendResponse(Socket s, short opStatus, long timeout) + throws IOException { + DataOutputStream reply = + new DataOutputStream(NetUtils.getOutputStream(s, timeout)); + try { + reply.writeShort(opStatus); + reply.flush(); + } finally { + IOUtils.closeStream(reply); + } + } +} Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=685979&view=auto ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (added) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Thu Aug 14 10:58:30 2008 @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; + +/** + * Server used for receiving/sending a block of data. + * This is created to listen for requests from clients or + * other DataNodes. This small server does not use the + * Hadoop IPC mechanism. + */ +class DataXceiverServer implements Runnable, FSConstants { + public static final Log LOG = DataNode.LOG; + + ServerSocket ss; + DataNode datanode; + // Record all sockets opend for data transfer + Map childSockets = Collections.synchronizedMap( + new HashMap()); + + /** + * Maximal number of concurrent xceivers per node. + * Enforcing the limit is required in order to avoid data-node + * running out of memory. + */ + static final int MAX_XCEIVER_COUNT = 256; + int maxXceiverCount = MAX_XCEIVER_COUNT; + + /** + * We need an estimate for block size to check if the disk partition has + * enough space. For now we set it to be the default block size set + * in the server side configuration, which is not ideal because the + * default block size should be a client-size configuration. + * A better solution is to include in the header the estimated block size, + * i.e. either the actual block size or the default block size. + */ + long estimateBlockSize; + + + DataXceiverServer(ServerSocket ss, Configuration conf, + DataNode datanode) { + + this.ss = ss; + this.datanode = datanode; + + this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers", + MAX_XCEIVER_COUNT); + + this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + } + + /** + */ + public void run() { + while (datanode.shouldRun) { + try { + Socket s = ss.accept(); + s.setTcpNoDelay(true); + new Daemon(datanode.threadGroup, + new DataXceiver(s, datanode, this)).start(); + } catch (IOException ie) { + LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " + + StringUtils.stringifyException(ie)); + } catch (Throwable te) { + LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:" + + StringUtils.stringifyException(te)); + datanode.shouldRun = false; + } + } + try { + ss.close(); + } catch (IOException ie) { + LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " + + StringUtils.stringifyException(ie)); + } + } + + void kill() { + assert datanode.shouldRun == false : + "shoudRun should be set to false before killing"; + try { + this.ss.close(); + } catch (IOException ie) { + LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): " + + StringUtils.stringifyException(ie)); + } + + // close all the sockets that were accepted earlier + synchronized (childSockets) { + for (Iterator it = childSockets.values().iterator(); + it.hasNext();) { + Socket thissock = it.next(); + try { + thissock.close(); + } catch (IOException e) { + } + } + } + } +} Copied: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (from r685529, hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java) URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?p2=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java&p1=hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java&r1=685529&r2=685979&rev=685979&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestBlockReplacement.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Thu Aug 14 10:58:30 2008 @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs; +package org.apache.hadoop.hdfs.server.datanode; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -27,24 +27,25 @@ import java.util.List; import java.util.Random; +import junit.framework.TestCase; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; - - -import junit.framework.TestCase; /** * This class tests if block replacement request to data nodes work correctly. */ @@ -60,7 +61,7 @@ final long TOTAL_BYTES =6*bandwidthPerSec; long bytesToSend = TOTAL_BYTES; long start = Util.now(); - DataNode.Throttler throttler = new DataNode.Throttler(bandwidthPerSec); + BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec); long totalBytes = 0L; long bytesSent = 1024*512L; // 0.5MB throttler.throttle(bytesSent); Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ------------------------------------------------------------------------------ svn:keywords = Id Revision HeadURL Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ------------------------------------------------------------------------------ svn:mergeinfo =