Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 9188 invoked from network); 10 Mar 2006 01:19:27 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 10 Mar 2006 01:19:27 -0000 Received: (qmail 19127 invoked by uid 500); 10 Mar 2006 01:19:26 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 19106 invoked by uid 500); 10 Mar 2006 01:19:26 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 19097 invoked by uid 99); 10 Mar 2006 01:19:26 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2006 17:19:26 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [209.237.227.194] (HELO minotaur.apache.org) (209.237.227.194) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 Mar 2006 17:19:25 -0800 Received: (qmail 9080 invoked by uid 65534); 10 Mar 2006 01:19:04 -0000 Message-ID: <20060310011904.9078.qmail@minotaur.apache.org> Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r384665 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop: conf/Configuration.java dfs/DFSClient.java dfs/DataNode.java dfs/FSNamesystem.java dfs/NameNode.java mapred/JobConf.java mapred/MapOutputFile.java Date: Fri, 10 Mar 2006 01:19:01 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.7 X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: cutting Date: Thu Mar 9 17:18:58 2006 New Revision: 384665 URL: http://svn.apache.org/viewcvs?rev=384665&view=rev Log: Reverted changes from 384385, which removed local backup copy of block & removed most timeouts. That worked well when all hosts are healthy, but when a few are very slow it caused too many tasks to timeout and loads to balloon on slow hosts. So the local backup is back, but no longer in /tmp, rather in dfs.data.dir, and timeouts are back. I also added connect timeouts, so that dfs connects also don't get hung up by slow hosts. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Thu Mar 9 17:18:58 2006 @@ -263,6 +263,26 @@ set(propertyName, theClass.getName()); } + /** Returns a file name under a directory named in dirsProp with the + * given path. If dirsProp contains multiple directories, then + * one is chosen based on path's hash code. If the selected directory + * does not exist, an attempt is made to create it. + */ + public File getFile(String dirsProp, String path) throws IOException { + String[] dirs = getStrings(dirsProp); + int hashCode = path.hashCode(); + for (int i = 0; i < dirs.length; i++) { // try each local dir + int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length; + File file = new File(dirs[index], path); + File dir = file.getParentFile(); + if (dir.exists() || dir.mkdirs()) { + return file; + } + } + throw new IOException("No valid local directories in property: "+dirsProp); + } + + /** Returns the URL for the named resource. */ public URL getResource(String name) { return classLoader.getResource(name); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Mar 9 17:18:58 2006 @@ -47,12 +47,13 @@ Random r = new Random(); String clientName; Daemon leaseChecker; - + private Configuration conf; /** * Create a new DFSClient connected to the given namenode server. */ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) { + this.conf = conf; this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, nameNodeAddr, conf); try { this.localName = InetAddress.getLocalHost().getHostName(); @@ -255,6 +256,7 @@ * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ class DFSInputStream extends FSInputStream { + private Socket s = null; boolean closed = false; private String src; @@ -316,9 +318,9 @@ throw new IOException("Attempted to read past end of file"); } - if (blockStream != null) { - blockStream.close(); - partnerStream.close(); + if (s != null) { + s.close(); + s = null; } // @@ -348,7 +350,6 @@ // int failures = 0; InetSocketAddress targetAddr = null; - Socket s = null; TreeSet deadNodes = new TreeSet(); while (s == null) { DatanodeInfo chosenNode; @@ -376,8 +377,9 @@ continue; } try { - s = new Socket(targetAddr.getAddress(), targetAddr.getPort()); - //s.setSoTimeout(READ_TIMEOUT); + s = new Socket(); + s.connect(targetAddr, READ_TIMEOUT); + s.setSoTimeout(READ_TIMEOUT); // // Xmit header info to datanode @@ -428,10 +430,10 @@ throw new IOException("Stream closed"); } - if (blockStream != null) { + if (s != null) { blockStream.close(); - blockStream = null; - partnerStream.close(); + s.close(); + s = null; } super.close(); closed = true; @@ -520,6 +522,7 @@ * DFSOutputStream creates files from a stream of bytes. ****************************************************************/ class DFSOutputStream extends FSOutputStream { + private Socket s; boolean closed = false; private byte outBuf[] = new byte[BUFFER_SIZE]; @@ -528,8 +531,11 @@ private UTF8 src; boolean closingDown = false; private boolean overwrite; + private boolean blockStreamWorking; private DataOutputStream blockStream; private DataInputStream blockReplyStream; + private File backupFile; + private OutputStream backupStream; private Block block; private DatanodeInfo targets[]; private long filePos = 0; @@ -543,16 +549,31 @@ this.overwrite = overwrite; this.blockStream = null; this.blockReplyStream = null; + this.blockStreamWorking = false; + this.backupFile = newBackupFile(); + this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); nextBlockOutputStream(true); } + private File newBackupFile() throws IOException { + return conf.getFile("dfs.data.dir", + "tmp"+File.separator+ + "client-"+Math.abs(r.nextLong())); + } + /** * Open a DataOutputStream to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. * Must get block ID and the IDs of the destinations from the namenode. */ private synchronized void nextBlockOutputStream(boolean firstTime) throws IOException { + if (! firstTime && blockStreamWorking) { + blockStream.flush(); + s.close(); + blockStreamWorking = false; + } + boolean retry = false; long start = System.currentTimeMillis(); do { @@ -588,10 +609,10 @@ // Connect to first DataNode in the list. Abort if this fails. // InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString()); - Socket s = null; try { - s = new Socket(target.getAddress(), target.getPort()); - //s.setSoTimeout(READ_TIMEOUT); + s = new Socket(); + s.connect(target, READ_TIMEOUT); + s.setSoTimeout(READ_TIMEOUT); } catch (IOException ie) { // Connection failed. Let's wait a little bit and retry try { @@ -625,6 +646,7 @@ bytesWrittenToBlock = 0; blockStream = out; blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream())); + blockStreamWorking = true; } while (retry); } @@ -705,21 +727,18 @@ // // To the blockStream, write length, then bytes // - try { - blockStream.writeLong(workingPos); - blockStream.write(outBuf, 0, workingPos); - } catch (IOException ie) { - try { - blockStream.close(); - } catch (IOException ie2) { - } - try { - blockReplyStream.close(); - } catch (IOException ie2) { - } - namenode.abandonBlock(block, src.toString()); - throw ie; + if (blockStreamWorking) { + try { + blockStream.writeLong(workingPos); + blockStream.write(outBuf, 0, workingPos); + } catch (IOException ie) { + handleSocketException(ie); + } } + // + // To the local block backup, write just the bytes + // + backupStream.write(outBuf, 0, workingPos); // // Track position @@ -734,20 +753,64 @@ * We're done writing to the current block. */ private synchronized void endBlock() throws IOException { - try { - internalClose(); - } catch (IOException ie) { - namenode.abandonBlock(block, src.toString()); - throw ie; + boolean mustRecover = ! blockStreamWorking; + + // + // A zero-length set of data indicates the end of the block + // + if (blockStreamWorking) { + try { + internalClose(); + } catch (IOException ie) { + handleSocketException(ie); + mustRecover = true; + } finally { + blockStreamWorking = false; + } } + + // + // Done with local copy + // + backupStream.close(); + + // + // If necessary, recover from a failed datanode connection. + // + while (mustRecover) { + nextBlockOutputStream(false); + InputStream in = new FileInputStream(backupFile); + try { + byte buf[] = new byte[BUFFER_SIZE]; + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + blockStream.writeLong((long) bytesRead); + blockStream.write(buf, 0, bytesRead); + bytesRead = in.read(buf); + } + internalClose(); + LOG.info("Recovered from failed datanode connection"); + mustRecover = false; + } catch (IOException ie) { + handleSocketException(ie); + } finally { + in.close(); + } + } + + // + // Delete local backup, start new one + // + backupFile.delete(); + backupFile = newBackupFile(); + backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); } /** - * Close down stream to remote datanode. + * Close down stream to remote datanode. Called from two places + * in endBlock(); */ private synchronized void internalClose() throws IOException { - try { - // A zero-length set of data indicates the end of the block blockStream.writeLong(0); blockStream.flush(); @@ -761,16 +824,18 @@ lb.readFields(blockReplyStream); namenode.reportWrittenBlock(lb); - } finally { - try { - blockStream.close(); - } catch (IOException ie2) { - } - try { - blockReplyStream.close(); - } catch (IOException ie2) { - } + s.close(); + } + + private void handleSocketException(IOException ie) throws IOException { + LOG.log(Level.WARNING, "Error while writing.", ie); + try { + s.close(); + } catch (IOException ie2) { + LOG.log(Level.WARNING, "Error closing socket.", ie2); } + blockStreamWorking = false; + namenode.abandonBlock(block, src.toString()); } /** @@ -786,9 +851,13 @@ flush(); endBlock(); - blockStream.close(); - blockReplyStream.close(); + backupStream.close(); + backupFile.delete(); + if (blockStreamWorking) { + s.close(); + blockStreamWorking = false; + } super.close(); long localstart = System.currentTimeMillis(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Mar 9 17:18:58 2006 @@ -368,8 +368,9 @@ // Connect to backup machine mirrorTarget = createSocketAddr(targets[1].getName().toString()); try { - Socket s2 = new Socket(mirrorTarget.getAddress(), mirrorTarget.getPort()); - //s2.setSoTimeout(READ_TIMEOUT); + Socket s2 = new Socket(); + s2.connect(mirrorTarget, READ_TIMEOUT); + s2.setSoTimeout(READ_TIMEOUT); out2 = new DataOutputStream(new BufferedOutputStream(s2.getOutputStream())); in2 = new DataInputStream(new BufferedInputStream(s2.getInputStream())); @@ -507,7 +508,6 @@ mirrors.add(curTarget); LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()])); newLB.write(reply); - reply.flush(); } finally { reply.close(); } @@ -638,7 +638,9 @@ public void run() { xmitsInProgress++; try { - Socket s = new Socket(curTarget.getAddress(), curTarget.getPort()); + Socket s = new Socket(); + s.connect(curTarget, READ_TIMEOUT); + s.setSoTimeout(READ_TIMEOUT); DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { long filelen = data.getLength(b); @@ -673,6 +675,7 @@ } LOG.info("Transmitted block " + b + " to " + curTarget); } catch (IOException ie) { + LOG.log(Level.WARNING, "Failed to transfer "+b+" to "+curTarget, ie); } finally { xmitsInProgress--; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Mar 9 17:18:58 2006 @@ -783,7 +783,7 @@ * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation */ - public void gotHeartbeat(UTF8 name, long capacity, long remaining) { + public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) { synchronized (heartbeats) { synchronized (datanodeMap) { long capacityDiff = 0; @@ -1285,7 +1285,7 @@ } } Collections.shuffle(targetList); - + // // Now pick one // Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Mar 9 17:18:58 2006 @@ -184,7 +184,9 @@ * The client needs to give up on the block. */ public void abandonBlock(Block b, String src) throws IOException { - namesystem.abandonBlock(b, new UTF8(src)); + if (! namesystem.abandonBlock(b, new UTF8(src))) { + throw new IOException("Cannot abandon block during write to " + src); + } } /** */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Mar 9 17:18:58 2006 @@ -124,35 +124,7 @@ /** Constructs a local file name. Files are distributed among configured * local directories.*/ public File getLocalFile(String subdir, String name) throws IOException { - String param[] = new String[1]; - param[0] = name; - return getLocalFile(subdir, param, "", false); - } - // REMIND - mjc - rename this! getLocalFile() is not quite the same. - public File getLocalFile(String subdir, String names[], String ending) throws IOException { - return getLocalFile(subdir, names, ending, true); - } - File getLocalFile(String subdir, String names[], String ending, boolean existingFileTest) throws IOException { - String[] localDirs = getLocalDirs(); - for (int k = 0; k < names.length; k++) { - String path = subdir + File.separator + names[k] + ending; - int hashCode = path.hashCode(); - for (int i = 0; i < localDirs.length; i++) { // try each local dir - int index = (hashCode+i & Integer.MAX_VALUE) % localDirs.length; - File file = new File(localDirs[index], path); - File dir = file.getParentFile(); - if (existingFileTest) { - if (file.exists()) { - return file; - } - } else { - if (dir.exists() || dir.mkdirs()) { - return file; - } - } - } - } - throw new IOException("No valid local directories."); + return getFile("mapred.local.dir", name + File.separator + subdir); } public void setInputDir(File dir) { set("mapred.input.dir", dir); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=384665&r1=384664&r2=384665&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Mar 9 17:18:58 2006 @@ -67,9 +67,14 @@ throws IOException { return this.jobConf.getLocalFile(reduceTaskId, mapTaskId+".out"); } - public File getInputFile(String mapTaskId[], String reduceTaskId) + public File getInputFile(String mapTaskIds[], String reduceTaskId) throws IOException { - return this.jobConf.getLocalFile(reduceTaskId, mapTaskId, ".out"); + for (int i = 0; i < mapTaskIds.length; i++) { + File file = jobConf.getLocalFile(reduceTaskId, mapTaskIds[i]+".out"); + if (file.exists()) + return file; + } + throw new IOException("Input file not found!"); } /** Removes all of the files related to a task. */