Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 16228 invoked from network); 31 May 2007 19:32:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 31 May 2007 19:32:20 -0000 Received: (qmail 72610 invoked by uid 500); 31 May 2007 19:32:24 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 72596 invoked by uid 500); 31 May 2007 19:32:24 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 72587 invoked by uid 99); 31 May 2007 19:32:24 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 May 2007 12:32:24 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 May 2007 12:32:19 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 0C4101A981A; Thu, 31 May 2007 12:31:59 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r543225 - in /lucene/hadoop/branches/branch-0.13: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/dfs/ Date: Thu, 31 May 2007 19:31:58 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070531193159.0C4101A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu May 31 12:31:57 2007 New Revision: 543225 URL: http://svn.apache.org/viewvc?view=rev&rev=543225 Log: Merge -r 543206:543222 from trunk to 0.13 branch. Fixes: HADOOP-1242 and HADOOP-1332. Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Thu May 31 12:31:57 2007 @@ -430,6 +430,12 @@ AlreadyBeingCreatedException when wrapped as a RemoteException. (Hairong Kuang via tomwhite) +129. HADOOP-1242. Improve handling of DFS upgrades. + (Konstantin Shvachko via cutting) + +130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit + tests on Windows. (omalley via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java Thu May 31 12:31:57 2007 @@ -11,7 +11,6 @@ import org.apache.hadoop.dfs.FSConstants.StartupOption; import org.apache.hadoop.dfs.FSConstants.NodeType; -import org.apache.hadoop.dfs.FSImage.NameNodeFile; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.fs.FileUtil.HardLink; @@ -164,6 +163,21 @@ File oldF = new File(sd.root, "storage"); if (!oldF.exists()) return false; + // check the layout version inside the storage file + // Lock and Read old storage file + RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); + if (oldFile == null) + throw new IOException("Cannot read file: " + oldF); + FileLock oldLock = oldFile.getChannel().tryLock(); + try { + oldFile.seek(0); + int odlVersion = oldFile.readInt(); + if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) + return false; + } finally { + oldLock.release(); + oldFile.close(); + } // check consistency of the old storage File oldDataDir = new File(sd.root, "data"); if (!oldDataDir.exists()) @@ -206,13 +220,14 @@ FileLock oldLock = oldFile.getChannel().tryLock(); if (oldLock == null) throw new IOException("Cannot lock file: " + oldF); + String odlStorageID = ""; try { oldFile.seek(0); int odlVersion = oldFile.readInt(); if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) throw new IncorrectVersionException(odlVersion, "file " + oldF, LAST_PRE_UPGRADE_LAYOUT_VERSION); - String odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile); + odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile); // check new storage File newDataDir = sd.getCurrentDir(); @@ -221,14 +236,8 @@ throw new IOException("Version file already exists: " + versionF); if (newDataDir.exists()) // somebody created current dir manually deleteDir(newDataDir); - // Write new layout + // move "data" to "current" rename(oldDataDir, newDataDir); - - this.layoutVersion = FSConstants.LAYOUT_VERSION; - this.namespaceID = nsInfo.getNamespaceID(); - this.cTime = 0; - this.storageID = odlStorageID; - sd.write(); // close and unlock old file } finally { oldLock.release(); @@ -236,6 +245,13 @@ } // move old storage file into current dir rename(oldF, new File(sd.getCurrentDir(), "storage")); + + // Write new version file + this.layoutVersion = FSConstants.LAYOUT_VERSION; + this.namespaceID = nsInfo.getNamespaceID(); + this.cTime = 0; + this.storageID = odlStorageID; + sd.write(); LOG.info("Conversion of " + oldF + " is complete."); } @@ -408,5 +424,23 @@ for(int i = 0; i < blockNames.length; i++) linkBlocks(new File(from, blockNames[i]), new File(to, blockNames[i])); + } + + protected void corruptPreUpgradeStorage(File rootDir) throws IOException { + File oldF = new File(rootDir, "storage"); + if (oldF.exists()) + return; + // recreate old storage file to let pre-upgrade versions fail + if (!oldF.createNewFile()) + throw new IOException("Cannot create file " + oldF); + RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); + if (oldFile == null) + throw new IOException("Cannot read file: " + oldF); + // write new version into old storage file + try { + writeCorruptedData(oldFile); + } finally { + oldFile.close(); + } } } Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java Thu May 31 12:31:57 2007 @@ -27,6 +27,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.AbstractList; import java.util.ArrayList; import java.util.Collection; @@ -435,7 +436,21 @@ boolean isConversionNeeded(StorageDirectory sd) throws IOException { File oldImageDir = new File(sd.root, "image"); if (!oldImageDir.exists()) - return false; + throw new InconsistentFSStateException(sd.root, + oldImageDir + " does not exist."); + // check the layout version inside the image file + File oldF = new File(oldImageDir, "fsimage"); + RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws"); + if (oldFile == null) + throw new IOException("Cannot read file: " + oldF); + try { + oldFile.seek(0); + int odlVersion = oldFile.readInt(); + if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) + return false; + } finally { + oldFile.close(); + } // check consistency of the old storage if (!oldImageDir.isDirectory()) throw new InconsistentFSStateException(sd.root, @@ -492,8 +507,8 @@ needReformat = true; } else { sd.write(); - LOG.info("Conversion of " + oldImage + " is complete."); } + LOG.info("Conversion of " + oldImage + " is complete."); return needReformat; } @@ -958,6 +973,27 @@ node.setRemaining(remaining); node.setLastUpdate(lastUpdate); node.setXceiverCount(xceiverCount); + } + } + + protected void corruptPreUpgradeStorage(File rootDir) throws IOException { + File oldImageDir = new File(rootDir, "image"); + if (!oldImageDir.exists()) + if (!oldImageDir.mkdir()) + throw new IOException("Cannot create directory " + oldImageDir); + File oldImage = new File(oldImageDir, "fsimage"); + if (!oldImage.exists()) + // recreate old image file to let pre-upgrade versions fail + if (!oldImage.createNewFile()) + throw new IOException("Cannot create file " + oldImage); + RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws"); + if (oldFile == null) + throw new IOException("Cannot read file: " + oldImage); + // write new version into old image file + try { + writeCorruptedData(oldFile); + } finally { + oldFile.close(); } } } Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java Thu May 31 12:31:57 2007 @@ -157,6 +157,7 @@ * @throws IOException */ void write() throws IOException { + corruptPreUpgradeStorage(root); write(getVersionFile()); } @@ -520,5 +521,21 @@ return "NS-" + Integer.toString(storage.getNamespaceID()) + "-" + Integer.toString(storage.getLayoutVersion()) + "-" + Long.toString(storage.getCTime()); + } + + // Pre-upgrade version compatibility + protected abstract void corruptPreUpgradeStorage(File rootDir) throws IOException; + + protected void writeCorruptedData(RandomAccessFile file) throws IOException { + final String messageForPreUpgradeVersion = + "\nThis file is INTENTIONALLY CORRUPTED so that versions\n" + + "of Hadoop prior to 0.13 (which are incompatible\n" + + "with this directory layout) will fail to start.\n"; + + file.seek(0); + file.writeInt(FSConstants.LAYOUT_VERSION); + org.apache.hadoop.io.UTF8.writeString(file, ""); + file.writeBytes(messageForPreUpgradeVersion); + file.getFD().sync(); } } Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu May 31 12:31:57 2007 @@ -388,16 +388,19 @@ */ private void runChild(String[] args, File dir) throws IOException { this.process = Runtime.getRuntime().exec(args, null, dir); + + Thread logStdErrThread = null; + Thread logStdOutThread = null; try { - new Thread() { - public void run() { - // Copy stderr of the process - logStream(process.getErrorStream(), taskStdErrLogWriter); - } - }.start(); - - // Copy stderr of the process; normally empty - logStream(process.getInputStream(), taskStdOutLogWriter); + // Copy stderr of the child-process via a thread + logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), + process.getErrorStream(), + taskStdErrLogWriter); + + // Copy stdout of the child-process via a thread + logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), + process.getInputStream(), + taskStdOutLogWriter); int exit_code = process.waitFor(); @@ -410,8 +413,21 @@ throw new IOException(e.toString()); } finally { kill(); - taskStdOutLogWriter.close(); - taskStdErrLogWriter.close(); + + // Kill both stdout/stderr copying threads + if (logStdErrThread != null) { + logStdErrThread.interrupt(); + try { + logStdErrThread.join(); + } catch (InterruptedException ie) {} + } + + if (logStdOutThread != null) { + logStdOutThread.interrupt(); + try { + logStdOutThread.join(); + } catch (InterruptedException ie) {} + } } } @@ -426,24 +442,47 @@ } /** + * Spawn a new thread to copy the child-jvm's stdout/stderr streams + * via a {@link TaskLog.Writer} + * + * @param threadName thread name + * @param stream child-jvm's stdout/stderr stream + * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data + * @return Return the newly created thread */ - private void logStream(InputStream output, TaskLog.Writer taskLog) { - try { - byte[] buf = new byte[512]; - int n = 0; - while ((n = output.read(buf, 0, buf.length)) != -1) { - // Write out to the task's log - taskLog.write(buf, 0, n); - } - } catch (IOException e) { - LOG.warn(t.getTaskId()+" Error reading child output", e); - } finally { - try { - output.close(); - } catch (IOException e) { - LOG.warn(t.getTaskId()+" Error closing child output", e); + private Thread logStream(String threadName, + final InputStream stream, + final TaskLog.Writer taskLog) { + Thread loggerThread = new Thread() { + public void run() { + try { + byte[] buf = new byte[512]; + while (!Thread.interrupted()) { + while (stream.available() > 0) { + int n = stream.read(buf, 0, buf.length); + taskLog.write(buf, 0, n); + } + Thread.sleep(1000); + } + } catch (IOException e) { + LOG.warn(t.getTaskId()+" Error reading child output", e); + } catch (InterruptedException e) { + // expected + } finally { + try { + stream.close(); + taskLog.close(); + } catch (IOException e) { + LOG.warn(t.getTaskId()+" Error closing child output", e); + } + } } - } + }; + loggerThread.setName(threadName); + loggerThread.setDaemon(true); + loggerThread.start(); + + return loggerThread; } } Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java (original) +++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java Thu May 31 12:31:57 2007 @@ -23,11 +23,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.dfs.FSConstants.NodeType; import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE; import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE; import org.apache.hadoop.dfs.FSConstants.StartupOption; -import org.apache.hadoop.fs.Path; /** * This test ensures the appropriate response from the system when @@ -82,7 +80,6 @@ * This test attempts to finalize the NameNode and DataNode. */ public void testFinalize() throws Exception { - File[] baseDirs; UpgradeUtilities.initialize(); for (int numDirs = 1; numDirs <= 2; numDirs++) { Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java?view=diff&rev=543225&r1=543224&r2=543225 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java (original) +++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java Thu May 31 12:31:57 2007 @@ -250,16 +250,26 @@ LocalFileSystem localFS = FileSystem.getLocal(new Configuration()); switch (nodeType) { case NAME_NODE: - localFS.copyToLocalFile( - new Path(namenodeStorage.toString(), "current"), + localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"), new Path(newDir.toString()), false); + Path newImgDir = new Path(newDir.getParent(), "image"); + if (!localFS.exists(newImgDir)) + localFS.copyToLocalFile( + new Path(namenodeStorage.toString(), "image"), + newImgDir, + false); break; case DATA_NODE: - localFS.copyToLocalFile( - new Path(datanodeStorage.toString(), "current"), + localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"), new Path(newDir.toString()), false); + Path newStorageFile = new Path(newDir.getParent(), "storage"); + if (!localFS.exists(newStorageFile)) + localFS.copyToLocalFile( + new Path(datanodeStorage.toString(), "storage"), + newStorageFile, + false); break; } retVal[i] = newDir;