From hadoop-commits-return-1460-apmail-lucene-hadoop-commits-archive=lucene.apache.org@lucene.apache.org Mon Apr 16 21:45:51 2007 Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 11948 invoked from network); 16 Apr 2007 21:45:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Apr 2007 21:45:48 -0000 Received: (qmail 36994 invoked by uid 500); 16 Apr 2007 21:45:54 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 36974 invoked by uid 500); 16 Apr 2007 21:45:54 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 36965 invoked by uid 99); 16 Apr 2007 21:45:54 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Apr 2007 14:45:54 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Apr 2007 14:45:44 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 6B78A1A985D; Mon, 16 Apr 2007 14:44:59 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r529410 [10/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/... Date: Mon, 16 Apr 2007 21:44:46 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070416214459.6B78A1A985D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java Mon Apr 16 14:44:35 2007 @@ -50,421 +50,421 @@ **********************************************************/ public class SecondaryNameNode implements FSConstants, Runnable { - public static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.dfs.NameNode.Secondary"); - private static final String SRC_FS_IMAGE = "srcimage.tmp"; - private static final String FS_EDITS = "edits.tmp"; - private static final String DEST_FS_IMAGE = "destimage.tmp"; - - private ClientProtocol namenode; - private Configuration conf; - private InetSocketAddress nameNodeAddr; - private boolean shouldRun; - private StatusHttpServer infoServer; - private int infoPort; - private String infoBindAddress; - - private File checkpointDir; - private long checkpointPeriod; // in seconds - private long checkpointSize; // size (in MB) of current Edit Log - private File srcImage; - private File destImage; - private File editFile; - - private boolean[] simulation = null; // error simulation events - - /** - * Create a connection to the primary namenode. - */ - public SecondaryNameNode(Configuration conf) throws IOException { + public static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.dfs.NameNode.Secondary"); + private static final String SRC_FS_IMAGE = "srcimage.tmp"; + private static final String FS_EDITS = "edits.tmp"; + private static final String DEST_FS_IMAGE = "destimage.tmp"; + + private ClientProtocol namenode; + private Configuration conf; + private InetSocketAddress nameNodeAddr; + private boolean shouldRun; + private StatusHttpServer infoServer; + private int infoPort; + private String infoBindAddress; + + private File checkpointDir; + private long checkpointPeriod; // in seconds + private long checkpointSize; // size (in MB) of current Edit Log + private File srcImage; + private File destImage; + private File editFile; + + private boolean[] simulation = null; // error simulation events + + /** + * Create a connection to the primary namenode. + */ + public SecondaryNameNode(Configuration conf) throws IOException { - // - // initialize error simulation code for junit test - // - initializeErrorSimulationEvent(2); - - // - // Create connection to the namenode. - // - shouldRun = true; - nameNodeAddr = DataNode.createSocketAddr( - conf.get("fs.default.name", "local")); - this.conf = conf; - this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, conf); - - // - // initialize the webserver for uploading files. - // - infoPort = conf.getInt("dfs.secondary.info.port", 50090); - infoBindAddress = conf.get("dfs.secondary.info.bindAddress", "0.0.0.0"); - infoServer = new StatusHttpServer("dfs", infoBindAddress, infoPort, false); - infoServer.setAttribute("name.secondary", this); - infoServer.addServlet("getimage", "/getimage", GetImageServlet.class); - infoServer.start(); + // + // initialize error simulation code for junit test + // + initializeErrorSimulationEvent(2); - // - // Initialize other scheduling parameters from the configuration - // - String[] dirName = conf.getStrings("fs.checkpoint.dir"); - checkpointDir = new File(dirName[0]); - checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600); - checkpointSize = conf.getLong("fs.checkpoint.size", 4194304); - doSetup(); - - LOG.warn("Checkpoint Directory:" + checkpointDir); - LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " + - "(" + checkpointPeriod/60 + " min)"); - LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " + - "(" + checkpointSize/1024 + " KB)"); - } - - /** - * Shut down this instance of the datanode. - * Returns only after shutdown is complete. - */ - public void shutdown() { - shouldRun = false; - try { - infoServer.stop(); - } catch (Exception e) { - } - } + // + // Create connection to the namenode. + // + shouldRun = true; + nameNodeAddr = DataNode.createSocketAddr( + conf.get("fs.default.name", "local")); + this.conf = conf; + this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, nameNodeAddr, conf); - private void doSetup() throws IOException { - // - // Create the checkpoint directory if needed. - // - checkpointDir.mkdirs(); - srcImage = new File(checkpointDir, SRC_FS_IMAGE); - destImage = new File(checkpointDir, DEST_FS_IMAGE); - editFile = new File(checkpointDir, FS_EDITS); - srcImage.delete(); - destImage.delete(); - editFile.delete(); - } + // + // initialize the webserver for uploading files. + // + infoPort = conf.getInt("dfs.secondary.info.port", 50090); + infoBindAddress = conf.get("dfs.secondary.info.bindAddress", "0.0.0.0"); + infoServer = new StatusHttpServer("dfs", infoBindAddress, infoPort, false); + infoServer.setAttribute("name.secondary", this); + infoServer.addServlet("getimage", "/getimage", GetImageServlet.class); + infoServer.start(); - File getNewImage() { - return destImage; + // + // Initialize other scheduling parameters from the configuration + // + String[] dirName = conf.getStrings("fs.checkpoint.dir"); + checkpointDir = new File(dirName[0]); + checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600); + checkpointSize = conf.getLong("fs.checkpoint.size", 4194304); + doSetup(); + + LOG.warn("Checkpoint Directory:" + checkpointDir); + LOG.warn("Checkpoint Period :" + checkpointPeriod + " secs " + + "(" + checkpointPeriod/60 + " min)"); + LOG.warn("Log Size Trigger :" + checkpointSize + " bytes " + + "(" + checkpointSize/1024 + " KB)"); + } + + /** + * Shut down this instance of the datanode. + * Returns only after shutdown is complete. + */ + public void shutdown() { + shouldRun = false; + try { + infoServer.stop(); + } catch (Exception e) { } + } + private void doSetup() throws IOException { // - // The main work loop + // Create the checkpoint directory if needed. // - public void run() { + checkpointDir.mkdirs(); + srcImage = new File(checkpointDir, SRC_FS_IMAGE); + destImage = new File(checkpointDir, DEST_FS_IMAGE); + editFile = new File(checkpointDir, FS_EDITS); + srcImage.delete(); + destImage.delete(); + editFile.delete(); + } + + File getNewImage() { + return destImage; + } + + // + // The main work loop + // + public void run() { - // - // Poll the Namenode (once every 5 minutes) to find the size of the - // pending edit log. - // - long period = 5 * 60; // 5 minutes - long lastCheckpointTime = 0; - if (checkpointPeriod < period) { - period = checkpointPeriod; - } - - while (shouldRun) { - try { - Thread.sleep(1000 * period); - } catch (InterruptedException ie) { - // do nothing - } - if (!shouldRun) { - break; - } - try { - long now = System.currentTimeMillis(); - - long size = namenode.getEditLogSize(); - if (size >= checkpointSize || - now >= lastCheckpointTime + 1000 * checkpointPeriod) { - doCheckpoint(); - lastCheckpointTime = now; - } - } catch (IOException e) { - LOG.error("Exception in doCheckpoint:"); - LOG.error(StringUtils.stringifyException(e)); - e.printStackTrace(); - } - } + // + // Poll the Namenode (once every 5 minutes) to find the size of the + // pending edit log. + // + long period = 5 * 60; // 5 minutes + long lastCheckpointTime = 0; + if (checkpointPeriod < period) { + period = checkpointPeriod; } - /** - * get the current fsimage from Namenode. - */ - private void getFSImage() throws IOException { - String fsName = getInfoServer(); - String fileid = "getimage=1"; - TransferFsImage.getFileClient(fsName, fileid, srcImage); - LOG.info("Downloaded file " + srcImage + " size " + - srcImage.length() + " bytes."); - } - - /** - * get the old edits file from the NameNode - */ - private void getFSEdits() throws IOException { - String fsName = getInfoServer(); - String fileid = "getedit=1"; - TransferFsImage.getFileClient(fsName, fileid, editFile); - LOG.info("Downloaded file " + editFile + " size " + - editFile.length() + " bytes."); - } - - /** - * Copy the new fsimage into the NameNode - */ - private void putFSImage() throws IOException { - String fsName = getInfoServer(); - String fileid = "putimage=1&port=" + infoPort + - "&machine=" + - InetAddress.getLocalHost().getHostAddress(); - LOG.info("Posted URL " + fsName + fileid); - TransferFsImage.getFileClient(fsName, fileid, (File[])null); - } - - /* - * Returns the Jetty server that the Namenode is listening on. - */ - private String getInfoServer() throws IOException { - String fsName = conf.get("fs.default.name", "local"); - if (fsName.equals("local")) { - throw new IOException("This is not a DFS"); + while (shouldRun) { + try { + Thread.sleep(1000 * period); + } catch (InterruptedException ie) { + // do nothing } - String[] splits = fsName.split(":", 2); - int infoPort = conf.getInt("dfs.info.port", 50070); - return splits[0]+":"+infoPort; - } + if (!shouldRun) { + break; + } + try { + long now = System.currentTimeMillis(); - /* - * Create a new checkpoint - */ - void doCheckpoint() throws IOException { + long size = namenode.getEditLogSize(); + if (size >= checkpointSize || + now >= lastCheckpointTime + 1000 * checkpointPeriod) { + doCheckpoint(); + lastCheckpointTime = now; + } + } catch (IOException e) { + LOG.error("Exception in doCheckpoint:"); + LOG.error(StringUtils.stringifyException(e)); + e.printStackTrace(); + } + } + } + + /** + * get the current fsimage from Namenode. + */ + private void getFSImage() throws IOException { + String fsName = getInfoServer(); + String fileid = "getimage=1"; + TransferFsImage.getFileClient(fsName, fileid, srcImage); + LOG.info("Downloaded file " + srcImage + " size " + + srcImage.length() + " bytes."); + } + + /** + * get the old edits file from the NameNode + */ + private void getFSEdits() throws IOException { + String fsName = getInfoServer(); + String fileid = "getedit=1"; + TransferFsImage.getFileClient(fsName, fileid, editFile); + LOG.info("Downloaded file " + editFile + " size " + + editFile.length() + " bytes."); + } + + /** + * Copy the new fsimage into the NameNode + */ + private void putFSImage() throws IOException { + String fsName = getInfoServer(); + String fileid = "putimage=1&port=" + infoPort + + "&machine=" + + InetAddress.getLocalHost().getHostAddress(); + LOG.info("Posted URL " + fsName + fileid); + TransferFsImage.getFileClient(fsName, fileid, (File[])null); + } + + /* + * Returns the Jetty server that the Namenode is listening on. + */ + private String getInfoServer() throws IOException { + String fsName = conf.get("fs.default.name", "local"); + if (fsName.equals("local")) { + throw new IOException("This is not a DFS"); + } + String[] splits = fsName.split(":", 2); + int infoPort = conf.getInt("dfs.info.port", 50070); + return splits[0]+":"+infoPort; + } + + /* + * Create a new checkpoint + */ + void doCheckpoint() throws IOException { - // - // Do the required initialization of the merge work area. - // - doSetup(); + // + // Do the required initialization of the merge work area. + // + doSetup(); - // - // Tell the namenode to start logging transactions in a new edit file - // - namenode.rollEditLog(); + // + // Tell the namenode to start logging transactions in a new edit file + // + namenode.rollEditLog(); - // - // error simulation code for junit test - // - if (simulation != null && simulation[0]) { - throw new IOException("Simulating error0 " + - "after creating edits.new"); - } + // + // error simulation code for junit test + // + if (simulation != null && simulation[0]) { + throw new IOException("Simulating error0 " + + "after creating edits.new"); + } - getFSImage(); // Fetch fsimage - getFSEdits(); // Fetch edist - doMerge(); // Do the merge + getFSImage(); // Fetch fsimage + getFSEdits(); // Fetch edist + doMerge(); // Do the merge - // - // Upload the new image into the NameNode. Then tell the Namenode - // to make this new uploaded image as the most current image. - // - putFSImage(); - - // - // error simulation code for junit test - // - if (simulation != null && simulation[1]) { - throw new IOException("Simulating error1 " + - "after uploading new image to NameNode"); - } - - namenode.rollFsImage(); - - LOG.warn("Checkpoint done. Image Size:" + srcImage.length() + - " Edit Size:" + editFile.length() + - " New Image Size:" + destImage.length()); - } - - /** - * merges SRC_FS_IMAGE with FS_EDITS and writes the output into - * DEST_FS_IMAGE - */ - private void doMerge() throws IOException { - FSNamesystem namesystem = new FSNamesystem( - new FSImage(checkpointDir)); - FSImage fsImage = namesystem.dir.fsImage; - fsImage.loadFSImage(srcImage); - fsImage.getEditLog().loadFSEdits(editFile); - fsImage.saveFSImage(destImage); - } - - /** - * @param argv The parameters passed to this program. - * @exception Exception if the filesystem does not exist. - * @return 0 on success, non zero on error. - */ - private int processArgs(String[] argv) throws Exception { - - if (argv.length < 1) { - printUsage(""); - return -1; - } + // + // Upload the new image into the NameNode. Then tell the Namenode + // to make this new uploaded image as the most current image. + // + putFSImage(); - int exitCode = -1; - int i = 0; - String cmd = argv[i++]; + // + // error simulation code for junit test + // + if (simulation != null && simulation[1]) { + throw new IOException("Simulating error1 " + + "after uploading new image to NameNode"); + } + + namenode.rollFsImage(); + + LOG.warn("Checkpoint done. Image Size:" + srcImage.length() + + " Edit Size:" + editFile.length() + + " New Image Size:" + destImage.length()); + } + + /** + * merges SRC_FS_IMAGE with FS_EDITS and writes the output into + * DEST_FS_IMAGE + */ + private void doMerge() throws IOException { + FSNamesystem namesystem = new FSNamesystem( + new FSImage(checkpointDir)); + FSImage fsImage = namesystem.dir.fsImage; + fsImage.loadFSImage(srcImage); + fsImage.getEditLog().loadFSEdits(editFile); + fsImage.saveFSImage(destImage); + } + + /** + * @param argv The parameters passed to this program. + * @exception Exception if the filesystem does not exist. + * @return 0 on success, non zero on error. + */ + private int processArgs(String[] argv) throws Exception { + + if (argv.length < 1) { + printUsage(""); + return -1; + } + + int exitCode = -1; + int i = 0; + String cmd = argv[i++]; - // - // verify that we have enough command line parameters - // - if ("-geteditsize".equals(cmd)) { - if (argv.length != 1) { - printUsage(cmd); - return exitCode; - } - } else if ("-checkpoint".equals(cmd)) { - if (argv.length != 1 && argv.length != 2) { - printUsage(cmd); - return exitCode; - } - if (argv.length == 2 && !"force".equals(argv[i])) { - printUsage(cmd); - return exitCode; - } + // + // verify that we have enough command line parameters + // + if ("-geteditsize".equals(cmd)) { + if (argv.length != 1) { + printUsage(cmd); + return exitCode; } - - exitCode = 0; - try { - if ("-checkpoint".equals(cmd)) { - long size = namenode.getEditLogSize(); - if (size >= checkpointSize || - argv.length == 2 && "force".equals(argv[i])) { - doCheckpoint(); - } else { - System.err.println("EditLog size " + size + " bytes is " + - "smaller than configured checkpoint " + - "size " + checkpointSize + " bytes."); - System.err.println("Skipping checkpoint."); - } - } else if ("-geteditsize".equals(cmd)) { - long size = namenode.getEditLogSize(); - System.out.println("EditLog size is " + size + " bytes"); - } else { - exitCode = -1; - LOG.error(cmd.substring(1) + ": Unknown command"); - printUsage(""); - } - } catch (RemoteException e) { - // - // This is a error returned by hadoop server. Print - // out the first line of the error mesage, ignore the stack trace. - exitCode = -1; - try { - String[] content; - content = e.getLocalizedMessage().split("\n"); - LOG.error(cmd.substring(1) + ": " - + content[0]); - } catch (Exception ex) { - LOG.error(cmd.substring(1) + ": " - + ex.getLocalizedMessage()); - } - } catch (IOException e) { - // - // IO exception encountered locally. - // - exitCode = -1; - LOG.error(cmd.substring(1) + ": " - + e.getLocalizedMessage()); - } finally { - // Does the RPC connection need to be closed? - } + } else if ("-checkpoint".equals(cmd)) { + if (argv.length != 1 && argv.length != 2) { + printUsage(cmd); return exitCode; - } - - /** - * Displays format of commands. - * @param cmd The command that is being executed. - */ - private void printUsage(String cmd) { - if ("-geteditsize".equals(cmd)) { - System.err.println("Usage: java SecondaryNameNode" - + " [-geteditsize]"); - } else if ("-checkpoint".equals(cmd)) { - System.err.println("Usage: java SecondaryNameNode" - + " [-checkpoint [force]]"); - } else { - System.err.println("Usage: java SecondaryNameNode " + - "[-checkpoint [force]] " + - "[-geteditsize] "); } - } - - // - // utility method to facilitate junit test error simulation - // - void initializeErrorSimulationEvent(int numberOfEvents) { - simulation = new boolean[numberOfEvents]; - for (int i = 0; i < numberOfEvents; i++) { - simulation[i] = false; + if (argv.length == 2 && !"force".equals(argv[i])) { + printUsage(cmd); + return exitCode; } } - void setErrorSimulation(int index) { - assert(index < simulation.length); - simulation[index] = true; - } - - void clearErrorSimulation(int index) { - assert(index < simulation.length); - simulation[index] = false; - } - - /** - * This class is used in Namesystem's jetty to retrieve a file. - * Typically used by the Secondary NameNode to retrieve image and - * edit file for periodic checkpointing. - * @author Dhruba Borthakur - */ - public static class GetImageServlet extends HttpServlet { - public void doGet(HttpServletRequest request, - HttpServletResponse response - ) throws ServletException, IOException { - Map pmap = request.getParameterMap(); - try { - ServletContext context = getServletContext(); - SecondaryNameNode nn = (SecondaryNameNode) - context.getAttribute("name.secondary"); - TransferFsImage ff = new TransferFsImage(pmap, request, response); - if (ff.getImage()) { - TransferFsImage.getFileServer(response.getOutputStream(), - nn.getNewImage()); - } - LOG.info("New Image " + nn.getNewImage() + " retrieved by Namenode."); - } catch (IOException ie) { - StringUtils.stringifyException(ie); - LOG.error(ie); - String errMsg = "GetImage failed."; - response.sendError(HttpServletResponse.SC_GONE, errMsg); - throw ie; - + exitCode = 0; + try { + if ("-checkpoint".equals(cmd)) { + long size = namenode.getEditLogSize(); + if (size >= checkpointSize || + argv.length == 2 && "force".equals(argv[i])) { + doCheckpoint(); + } else { + System.err.println("EditLog size " + size + " bytes is " + + "smaller than configured checkpoint " + + "size " + checkpointSize + " bytes."); + System.err.println("Skipping checkpoint."); } - } - } - - /** - * main() has some simple utility methods. - * @param argv Command line parameters. - * @exception Exception if the filesystem does not exist. - */ - public static void main(String[] argv) throws Exception { - Configuration tconf = new Configuration(); - if (argv.length >= 1) { - SecondaryNameNode secondary = new SecondaryNameNode(tconf); - int ret = secondary.processArgs(argv); - System.exit(ret); + } else if ("-geteditsize".equals(cmd)) { + long size = namenode.getEditLogSize(); + System.out.println("EditLog size is " + size + " bytes"); + } else { + exitCode = -1; + LOG.error(cmd.substring(1) + ": Unknown command"); + printUsage(""); + } + } catch (RemoteException e) { + // + // This is a error returned by hadoop server. Print + // out the first line of the error mesage, ignore the stack trace. + exitCode = -1; + try { + String[] content; + content = e.getLocalizedMessage().split("\n"); + LOG.error(cmd.substring(1) + ": " + + content[0]); + } catch (Exception ex) { + LOG.error(cmd.substring(1) + ": " + + ex.getLocalizedMessage()); + } + } catch (IOException e) { + // + // IO exception encountered locally. + // + exitCode = -1; + LOG.error(cmd.substring(1) + ": " + + e.getLocalizedMessage()); + } finally { + // Does the RPC connection need to be closed? + } + return exitCode; + } + + /** + * Displays format of commands. + * @param cmd The command that is being executed. + */ + private void printUsage(String cmd) { + if ("-geteditsize".equals(cmd)) { + System.err.println("Usage: java SecondaryNameNode" + + " [-geteditsize]"); + } else if ("-checkpoint".equals(cmd)) { + System.err.println("Usage: java SecondaryNameNode" + + " [-checkpoint [force]]"); + } else { + System.err.println("Usage: java SecondaryNameNode " + + "[-checkpoint [force]] " + + "[-geteditsize] "); + } + } + + // + // utility method to facilitate junit test error simulation + // + void initializeErrorSimulationEvent(int numberOfEvents) { + simulation = new boolean[numberOfEvents]; + for (int i = 0; i < numberOfEvents; i++) { + simulation[i] = false; + } + } + + void setErrorSimulation(int index) { + assert(index < simulation.length); + simulation[index] = true; + } + + void clearErrorSimulation(int index) { + assert(index < simulation.length); + simulation[index] = false; + } + + /** + * This class is used in Namesystem's jetty to retrieve a file. + * Typically used by the Secondary NameNode to retrieve image and + * edit file for periodic checkpointing. + * @author Dhruba Borthakur + */ + public static class GetImageServlet extends HttpServlet { + public void doGet(HttpServletRequest request, + HttpServletResponse response + ) throws ServletException, IOException { + Map pmap = request.getParameterMap(); + try { + ServletContext context = getServletContext(); + SecondaryNameNode nn = (SecondaryNameNode) + context.getAttribute("name.secondary"); + TransferFsImage ff = new TransferFsImage(pmap, request, response); + if (ff.getImage()) { + TransferFsImage.getFileServer(response.getOutputStream(), + nn.getNewImage()); } + LOG.info("New Image " + nn.getNewImage() + " retrieved by Namenode."); + } catch (IOException ie) { + StringUtils.stringifyException(ie); + LOG.error(ie); + String errMsg = "GetImage failed."; + response.sendError(HttpServletResponse.SC_GONE, errMsg); + throw ie; - // Create a never ending deamon - Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); - checkpointThread.start(); + } } + } + + /** + * main() has some simple utility methods. + * @param argv Command line parameters. + * @exception Exception if the filesystem does not exist. + */ + public static void main(String[] argv) throws Exception { + Configuration tconf = new Configuration(); + if (argv.length >= 1) { + SecondaryNameNode secondary = new SecondaryNameNode(tconf); + int ret = secondary.processArgs(argv); + System.exit(ret); + } + + // Create a never ending deamon + Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); + checkpointThread.start(); + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/StreamFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/StreamFile.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/StreamFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/StreamFile.java Mon Apr 16 14:44:35 2007 @@ -41,7 +41,7 @@ } } public void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException, IOException { + throws ServletException, IOException { String filename = request.getParameter("filename"); if (filename == null || filename.length() == 0) { response.setContentType("text/plain"); @@ -53,7 +53,7 @@ FSInputStream in = dfs.open(new UTF8(filename)); OutputStream os = response.getOutputStream(); response.setHeader("Content-Disposition", "attachment; filename=\"" + - filename + "\""); + filename + "\""); response.setContentType("application/octet-stream"); byte buf[] = new byte[4096]; try { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/TransferFsImage.java Mon Apr 16 14:44:35 2007 @@ -107,7 +107,7 @@ * Copies the contents of the local file into the output stream. */ static void getFileServer(OutputStream outstream, File localfile) - throws IOException { + throws IOException { byte buf[] = new byte[BUFFER_SIZE]; FileInputStream infile = null; try { @@ -132,54 +132,54 @@ * Client-side Method to fetch file from a server * Copies the response from the URL to a list of local files. */ - static void getFileClient(String fsName, String id, File[] localPath) - throws IOException { - byte[] buf = new byte[BUFFER_SIZE]; - StringBuffer str = new StringBuffer("http://"+fsName+"/getimage?"); - str.append(id); + static void getFileClient(String fsName, String id, File[] localPath) + throws IOException { + byte[] buf = new byte[BUFFER_SIZE]; + StringBuffer str = new StringBuffer("http://"+fsName+"/getimage?"); + str.append(id); - // - // open connection to remote server - // - URL url = new URL(str.toString()); - URLConnection connection = url.openConnection(); - InputStream stream = connection.getInputStream(); - FileOutputStream[] output = null; - if (localPath != null) { - output = new FileOutputStream[localPath.length]; - for (int i = 0; i < output.length; i++) { - output[i] = new FileOutputStream(localPath[i]); - } - } + // + // open connection to remote server + // + URL url = new URL(str.toString()); + URLConnection connection = url.openConnection(); + InputStream stream = connection.getInputStream(); + FileOutputStream[] output = null; + if (localPath != null) { + output = new FileOutputStream[localPath.length]; + for (int i = 0; i < output.length; i++) { + output[i] = new FileOutputStream(localPath[i]); + } + } - try { - int num = 1; - while (num > 0) { - num = stream.read(buf); - if (num > 0 && localPath != null) { - for (int i = 0; i < output.length; i++) { - output[i].write(buf, 0, num); - } - } - } - } finally { - stream.close(); - if (localPath != null) { - for (int i = 0; i < output.length; i++) { - output[i].close(); - } - } - } - } + try { + int num = 1; + while (num > 0) { + num = stream.read(buf); + if (num > 0 && localPath != null) { + for (int i = 0; i < output.length; i++) { + output[i].write(buf, 0, num); + } + } + } + } finally { + stream.close(); + if (localPath != null) { + for (int i = 0; i < output.length; i++) { + output[i].close(); + } + } + } + } /** * Client-side Method to fetch file from a server * Copies the response from the URL to the local file. */ - static void getFileClient(String fsName, String id, File localPath) - throws IOException { - File[] filelist = new File[1]; - filelist[0] = localPath; - getFileClient(fsName, id, filelist); - } + static void getFileClient(String fsName, String id, File localPath) + throws IOException { + File[] filelist = new File[1]; + filelist[0] = localPath; + getFileClient(fsName, id, filelist); + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Mon Apr 16 14:44:35 2007 @@ -63,7 +63,7 @@ * @throws IOException */ public static Path getLocalCache(URI cache, Configuration conf, Path baseDir, - boolean isArchive, String md5, Path currentWorkDir) throws IOException { + boolean isArchive, String md5, Path currentWorkDir) throws IOException { String cacheId = makeRelative(cache, conf); CacheStatus lcacheStatus; Path localizedPath; @@ -105,7 +105,7 @@ * @throws IOException */ public static void releaseCache(URI cache, Configuration conf) - throws IOException { + throws IOException { String cacheId = makeRelative(cache, conf); synchronized (cachedArchives) { CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId); @@ -143,7 +143,7 @@ * on/absolute_path */ private static String makeRelative(URI cache, Configuration conf) - throws IOException { + throws IOException { String fsname = cache.getScheme(); String path; FileSystem dfs = FileSystem.get(conf); @@ -162,7 +162,7 @@ // the methoed which actually copies the caches locally and unjars/unzips them private static Path localizeCache(URI cache, CacheStatus cacheStatus, - Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException { + Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException { boolean b = true; boolean doSymlink = getSymlink(conf); FileSystem dfs = getFileSystem(cache, conf); @@ -174,7 +174,7 @@ if (doSymlink){ if (!flink.exists()) FileUtil.symLink(cacheStatus.localLoadPath.toString(), - link); + link); } return cacheStatus.localLoadPath; } @@ -182,7 +182,7 @@ if (doSymlink){ if (!flink.exists()) FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), - link); + link); } return cacheFilePath(cacheStatus.localLoadPath); } @@ -193,29 +193,29 @@ // return null if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true)) throw new IOException("Cache " + cacheStatus.localLoadPath.toString() - + " is in use and cannot be refreshed"); + + " is in use and cannot be refreshed"); byte[] checkSum = createMD5(cache, conf); FileSystem localFs = FileSystem.getLocal(conf); localFs.delete(cacheStatus.localLoadPath); Path parchive = new Path(cacheStatus.localLoadPath, new Path(cacheStatus.localLoadPath.getName())); if (!localFs.mkdirs(cacheStatus.localLoadPath)) { - throw new IOException("Mkdirs failed to create directory " + - cacheStatus.localLoadPath.toString()); + throw new IOException("Mkdirs failed to create directory " + + cacheStatus.localLoadPath.toString()); } String cacheId = cache.getPath(); dfs.copyToLocalFile(new Path(cacheId), parchive); dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive - .toString() - + "_md5")); + .toString() + + "_md5")); if (isArchive) { String tmpArchive = parchive.toString().toLowerCase(); if (tmpArchive.endsWith(".jar")) { RunJar.unJar(new File(parchive.toString()), new File(parchive - .getParent().toString())); + .getParent().toString())); } else if (tmpArchive.endsWith(".zip")) { FileUtil.unZip(new File(parchive.toString()), new File(parchive - .getParent().toString())); + .getParent().toString())); } // else will not do anyhting @@ -231,7 +231,7 @@ if (doSymlink){ if (!flink.exists()) FileUtil.symLink(cacheStatus.localLoadPath.toString(), - link); + link); } return cacheStatus.localLoadPath; } @@ -239,7 +239,7 @@ if (doSymlink){ if (!flink.exists()) FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), - link); + link); } return cacheFilePath(cacheStatus.localLoadPath); } @@ -247,7 +247,7 @@ // Checks if the cache has already been localized and is fresh private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache, - FileSystem dfs, String confMD5, Configuration conf) throws IOException { + FileSystem dfs, String confMD5, Configuration conf) throws IOException { // compute the md5 of the crc byte[] digest = null; byte[] fsDigest = createMD5(cache, conf); @@ -259,7 +259,7 @@ digest = lcacheStatus.md5; if (!MessageDigest.isEqual(confDigest, fsDigest)) { throw new IOException("Inconsistencty in data caching, " - + "Cache archives have been changed"); + + "Cache archives have been changed"); } else { if (!MessageDigest.isEqual(confDigest, digest)) { // needs refreshing @@ -283,19 +283,19 @@ * @throws IOException */ public static byte[] createMD5(URI cache, Configuration conf) - throws IOException { + throws IOException { byte[] b = new byte[CRC_BUFFER_SIZE]; byte[] digest = null; FileSystem fileSystem = getFileSystem(cache, conf); if(!(fileSystem instanceof ChecksumFileSystem)) { - throw new IOException( "Not a checksummed file system: " - +fileSystem.getUri() ); + throw new IOException( "Not a checksummed file system: " + +fileSystem.getUri() ); } String filename = cache.getPath(); Path filePath = new Path(filename); Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR - + filePath.getName() + "_md5"); + + filePath.getName() + "_md5"); MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); @@ -305,13 +305,13 @@ if (!fileSystem.exists(md5File)) { ChecksumFileSystem checksumFs; if(!(fileSystem instanceof ChecksumFileSystem)) { - throw new IOException( - "Not a checksumed file system: "+fileSystem.getUri()); + throw new IOException( + "Not a checksumed file system: "+fileSystem.getUri()); } else { - checksumFs = (ChecksumFileSystem)fileSystem; + checksumFs = (ChecksumFileSystem)fileSystem; } FSDataInputStream fsStream = checksumFs.getRawFileSystem().open( - checksumFs.getChecksumFile(filePath)); + checksumFs.getChecksumFile(filePath)); int read = fsStream.read(b); while (read != -1) { md5.update(b, 0, read); @@ -343,18 +343,18 @@ * @throws IOException */ public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir) - throws IOException{ + throws IOException{ if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){ return; } boolean createSymlink = getSymlink(conf); - if (createSymlink){ - File[] list = jobCacheDir.listFiles(); - for (int i=0; i < list.length; i++){ - FileUtil.symLink(list[i].getAbsolutePath(), - new File(workDir, list[i].getName()).toString()); - } - } + if (createSymlink){ + File[] list = jobCacheDir.listFiles(); + for (int i=0; i < list.length; i++){ + FileUtil.symLink(list[i].getAbsolutePath(), + new File(workDir, list[i].getName()).toString()); + } + } } private static String getFileSysName(URI url) { @@ -369,7 +369,7 @@ } private static FileSystem getFileSystem(URI cache, Configuration conf) - throws IOException { + throws IOException { String fileSysName = getFileSysName(cache); if (fileSysName != null) return FileSystem.getNamed(fileSysName, conf); @@ -425,9 +425,9 @@ * @throws IOException */ public static Path[] getLocalCacheArchives(Configuration conf) - throws IOException { + throws IOException { return StringUtils.stringToPath(conf - .getStrings("mapred.cache.localArchives")); + .getStrings("mapred.cache.localArchives")); } /** @@ -437,7 +437,7 @@ * @throws IOException */ public static Path[] getLocalCacheFiles(Configuration conf) - throws IOException { + throws IOException { return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles")); } @@ -508,7 +508,7 @@ public static void addCacheArchive(URI uri, Configuration conf) { String archives = conf.get("mapred.cache.archives"); conf.set("mapred.cache.archives", archives == null ? uri.toString() - : archives + "," + uri.toString()); + : archives + "," + uri.toString()); } /** @@ -519,81 +519,81 @@ public static void addCacheFile(URI uri, Configuration conf) { String files = conf.get("mapred.cache.files"); conf.set("mapred.cache.files", files == null ? uri.toString() : files + "," - + uri.toString()); + + uri.toString()); + } + + /** + * Add an file path to the current set of classpath entries It adds the file + * to cache as well. + * + * @param file Path of the file to be added + * @param conf Configuration that contains the classpath setting + */ + public static void addFileToClassPath(Path file, Configuration conf) + throws IOException { + String classpath = conf.get("mapred.job.classpath.files"); + conf.set("mapred.job.classpath.files", classpath == null ? file.toString() + : classpath + System.getProperty("path.separator") + file.toString()); + FileSystem fs = FileSystem.get(conf); + URI uri = fs.makeQualified(file).toUri(); + + addCacheFile(uri, conf); } - /** - * Add an file path to the current set of classpath entries It adds the file - * to cache as well. - * - * @param file Path of the file to be added - * @param conf Configuration that contains the classpath setting - */ - public static void addFileToClassPath(Path file, Configuration conf) - throws IOException { - String classpath = conf.get("mapred.job.classpath.files"); - conf.set("mapred.job.classpath.files", classpath == null ? file.toString() - : classpath + System.getProperty("path.separator") + file.toString()); - FileSystem fs = FileSystem.get(conf); - URI uri = fs.makeQualified(file).toUri(); - - addCacheFile(uri, conf); - } - - /** - * Get the file entries in classpath as an array of Path - * - * @param conf Configuration that contains the classpath setting - */ - public static Path[] getFileClassPaths(Configuration conf) { - String classpath = conf.get("mapred.job.classpath.files"); - if (classpath == null) - return null; - ArrayList list = Collections.list(new StringTokenizer(classpath, System - .getProperty("path.separator"))); - Path[] paths = new Path[list.size()]; - for (int i = 0; i < list.size(); i++) { - paths[i] = new Path((String) list.get(i)); - } - return paths; - } - - /** - * Add an archive path to the current set of classpath entries. It adds the - * archive to cache as well. - * - * @param archive Path of the archive to be added - * @param conf Configuration that contains the classpath setting - */ - public static void addArchiveToClassPath(Path archive, Configuration conf) - throws IOException { - String classpath = conf.get("mapred.job.classpath.archives"); - conf.set("mapred.job.classpath.archives", classpath == null ? archive - .toString() : classpath + System.getProperty("path.separator") - + archive.toString()); - FileSystem fs = FileSystem.get(conf); - URI uri = fs.makeQualified(archive).toUri(); - - addCacheArchive(uri, conf); - } - - /** - * Get the archive entries in classpath as an array of Path - * - * @param conf Configuration that contains the classpath setting - */ - public static Path[] getArchiveClassPaths(Configuration conf) { - String classpath = conf.get("mapred.job.classpath.archives"); - if (classpath == null) - return null; - ArrayList list = Collections.list(new StringTokenizer(classpath, System - .getProperty("path.separator"))); - Path[] paths = new Path[list.size()]; - for (int i = 0; i < list.size(); i++) { - paths[i] = new Path((String) list.get(i)); - } - return paths; - } + /** + * Get the file entries in classpath as an array of Path + * + * @param conf Configuration that contains the classpath setting + */ + public static Path[] getFileClassPaths(Configuration conf) { + String classpath = conf.get("mapred.job.classpath.files"); + if (classpath == null) + return null; + ArrayList list = Collections.list(new StringTokenizer(classpath, System + .getProperty("path.separator"))); + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path((String) list.get(i)); + } + return paths; + } + + /** + * Add an archive path to the current set of classpath entries. It adds the + * archive to cache as well. + * + * @param archive Path of the archive to be added + * @param conf Configuration that contains the classpath setting + */ + public static void addArchiveToClassPath(Path archive, Configuration conf) + throws IOException { + String classpath = conf.get("mapred.job.classpath.archives"); + conf.set("mapred.job.classpath.archives", classpath == null ? archive + .toString() : classpath + System.getProperty("path.separator") + + archive.toString()); + FileSystem fs = FileSystem.get(conf); + URI uri = fs.makeQualified(archive).toUri(); + + addCacheArchive(uri, conf); + } + + /** + * Get the archive entries in classpath as an array of Path + * + * @param conf Configuration that contains the classpath setting + */ + public static Path[] getArchiveClassPaths(Configuration conf) { + String classpath = conf.get("mapred.job.classpath.archives"); + if (classpath == null) + return null; + ArrayList list = Collections.list(new StringTokenizer(classpath, System + .getProperty("path.separator"))); + Path[] paths = new Path[list.size()]; + for (int i = 0; i < list.size(); i++) { + paths[i] = new Path((String) list.get(i)); + } + return paths; + } /** * This method allows you to create symlinks in the current working directory @@ -655,7 +655,7 @@ if (frag3 == null) return false; if (frag2.equalsIgnoreCase(frag3)) - return false; + return false; } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Mon Apr 16 14:44:35 2007 @@ -217,7 +217,7 @@ LOG.info("Found checksum error: "+StringUtils.stringifyException(ce)); long errPos = ce.getPos(); boolean shouldRetry = fs.reportChecksumFailure( - file, datas, errPos, sums, errPos/bytesPerSum); + file, datas, errPos, sums, errPos/bytesPerSum); if (!shouldRetry || retriesLeft == 0) { throw ce; } @@ -226,14 +226,14 @@ datas.seek(oldPos); if (seekToNewSource(oldPos)) { - // Since at least one of the sources is different, - // the read might succeed, so we'll retry. - retry = true; + // Since at least one of the sources is different, + // the read might succeed, so we'll retry. + retry = true; } else { - // Neither the data stream nor the checksum stream are being read - // from different sources, meaning we'll still get a checksum error - // if we try to do the read again. We throw an exception instead. - throw ce; + // Neither the data stream nor the checksum stream are being read + // from different sources, meaning we'll still get a checksum error + // if we try to do the read again. We throw an exception instead. + throw ce; } } } @@ -636,7 +636,7 @@ * @return if retry is neccessary */ public boolean reportChecksumFailure(Path f, FSDataInputStream in, - long inPos, FSDataInputStream sums, long sumsPos) { + long inPos, FSDataInputStream sums, long sumsPos) { return false; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/DF.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/DF.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/DF.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/DF.java Mon Apr 16 14:44:35 2007 @@ -62,11 +62,11 @@ try { if (process.waitFor() != 0) { throw new IOException - (new BufferedReader(new InputStreamReader(process.getErrorStream())) - .readLine()); + (new BufferedReader(new InputStreamReader(process.getErrorStream())) + .readLine()); } parseExecResult( - new BufferedReader(new InputStreamReader(process.getInputStream()))); + new BufferedReader(new InputStreamReader(process.getInputStream()))); } catch (InterruptedException e) { throw new IOException(e.toString()); } finally { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Mon Apr 16 14:44:35 2007 @@ -27,53 +27,53 @@ *****************************************************************/ public abstract class FSInputStream extends InputStream implements Seekable, PositionedReadable { - /** - * Seek to the given offset from the start of the file. - * The next read() will be from that location. Can't - * seek past the end of the file. - */ - public abstract void seek(long pos) throws IOException; + /** + * Seek to the given offset from the start of the file. + * The next read() will be from that location. Can't + * seek past the end of the file. + */ + public abstract void seek(long pos) throws IOException; - /** - * Return the current offset from the start of the file - */ - public abstract long getPos() throws IOException; + /** + * Return the current offset from the start of the file + */ + public abstract long getPos() throws IOException; - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - */ - public abstract boolean seekToNewSource(long targetPos) throws IOException; + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + */ + public abstract boolean seekToNewSource(long targetPos) throws IOException; - public int read(long position, byte[] buffer, int offset, int length) + public int read(long position, byte[] buffer, int offset, int length) throws IOException { - synchronized (this) { - long oldPos = getPos(); - int nread = -1; - try { - seek(position); - nread = read(buffer, offset, length); - } finally { - seek(oldPos); - } - return nread; + synchronized (this) { + long oldPos = getPos(); + int nread = -1; + try { + seek(position); + nread = read(buffer, offset, length); + } finally { + seek(oldPos); } + return nread; } + } - public void readFully(long position, byte[] buffer, int offset, int length) + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - int nread = 0; - while (nread < length) { - int nbytes = read(position+nread, buffer, offset+nread, length-nread); - if (nbytes < 0) { - throw new EOFException("End of file reached before reading fully."); - } - nread += nbytes; + int nread = 0; + while (nread < length) { + int nbytes = read(position+nread, buffer, offset+nread, length-nread); + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); } + nread += nbytes; } + } - public void readFully(long position, byte[] buffer) + public void readFully(long position, byte[] buffer) throws IOException { - readFully(position, buffer, 0, buffer.length); - } + readFully(position, buffer, 0, buffer.length); + } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Mon Apr 16 14:44:35 2007 @@ -68,42 +68,42 @@ FileSystem dstFS, Path dst, boolean deleteSource, Configuration conf ) throws IOException { - dst = checkDest(src.getName(), dstFS, dst); + dst = checkDest(src.getName(), dstFS, dst); - if (srcFS.isDirectory(src)) { - if (!dstFS.mkdirs(dst)) { - return false; - } - Path contents[] = srcFS.listPaths(src); - for (int i = 0; i < contents.length; i++) { - copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()), - deleteSource, conf); - } - } else if (srcFS.isFile(src)) { - InputStream in = srcFS.open(src); - try { - OutputStream out = dstFS.create(dst); - copyContent(in, out, conf); - } finally { - in.close(); - } - } else { - throw new IOException(src.toString() + ": No such file or directory"); + if (srcFS.isDirectory(src)) { + if (!dstFS.mkdirs(dst)) { + return false; } - if (deleteSource) { - return srcFS.delete(src); - } else { - return true; + Path contents[] = srcFS.listPaths(src); + for (int i = 0; i < contents.length; i++) { + copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()), + deleteSource, conf); } + } else if (srcFS.isFile(src)) { + InputStream in = srcFS.open(src); + try { + OutputStream out = dstFS.create(dst); + copyContent(in, out, conf); + } finally { + in.close(); + } + } else { + throw new IOException(src.toString() + ": No such file or directory"); + } + if (deleteSource) { + return srcFS.delete(src); + } else { + return true; + } } /** Copy all files in a directory to one output file (merge). */ public static boolean copyMerge(FileSystem srcFS, Path srcDir, - FileSystem dstFS, Path dstFile, - boolean deleteSource, - Configuration conf, String addString) throws IOException { - dstFile = checkDest(srcDir.getName(), dstFS, dstFile); + FileSystem dstFS, Path dstFile, + boolean deleteSource, + Configuration conf, String addString) throws IOException { + dstFile = checkDest(srcDir.getName(), dstFS, dstFile); if (!srcFS.isDirectory(srcDir)) return false; @@ -200,7 +200,7 @@ } private static void copyContent(InputStream in, OutputStream out, - Configuration conf) throws IOException { + Configuration conf) throws IOException { copyContent(in, out, conf, true); } @@ -274,7 +274,7 @@ } } - /** + /** * Given a File input it will unzip the file in a the unzip directory * passed as the second parameter * @param inFile The zip file as input @@ -363,10 +363,10 @@ try { if (process.waitFor() != 0) { String errMsg = new BufferedReader(new InputStreamReader( - process.getInputStream())).readLine(); + process.getInputStream())).readLine(); if( errMsg == null ) errMsg = ""; String inpMsg = new BufferedReader(new InputStreamReader( - process.getErrorStream())).readLine(); + process.getErrorStream())).readLine(); if( inpMsg == null ) inpMsg = ""; throw new IOException( errMsg + inpMsg ); } @@ -386,16 +386,16 @@ * @return value returned by the command */ public static int symLink(String target, String linkname) throws IOException{ - String cmd = "ln -s " + target + " " + linkname; - Process p = Runtime.getRuntime().exec( cmd, null ); - int returnVal = -1; - try{ - returnVal = p.waitFor(); - } catch(InterruptedException e){ - //do nothing as of yet - } - return returnVal; - } + String cmd = "ln -s " + target + " " + linkname; + Process p = Runtime.getRuntime().exec( cmd, null ); + int returnVal = -1; + try{ + returnVal = p.waitFor(); + } catch(InterruptedException e){ + //do nothing as of yet + } + return returnVal; + } /** * Change the permissions on a filename. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java?view=diff&rev=529410&r1=529409&r2=529410 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FilterFileSystem.java Mon Apr 16 14:44:35 2007 @@ -92,7 +92,7 @@ * The FileSystem will simply return an elt containing 'localhost'. */ public String[][] getFileCacheHints(Path f, long start, long len) - throws IOException { + throws IOException { return fs.getFileCacheHints(f, start, len); } @@ -120,7 +120,7 @@ short replication, long blockSize, Progressable progress - ) throws IOException { + ) throws IOException { return fs.create(f, overwrite, bufferSize, replication, blockSize, progress); } @@ -217,7 +217,7 @@ * @deprecated FS does not support file locks anymore. */ @Deprecated - public void lock(Path f, boolean shared) throws IOException { + public void lock(Path f, boolean shared) throws IOException { fs.lock(f, shared); } @@ -227,7 +227,7 @@ * @deprecated FS does not support file locks anymore. */ @Deprecated - public void release(Path f) throws IOException { + public void release(Path f) throws IOException { fs.release(f); } @@ -237,7 +237,7 @@ * delSrc indicates if the source should be removed */ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) - throws IOException { + throws IOException { fs.copyFromLocalFile(delSrc, src, dst); } @@ -247,7 +247,7 @@ * delSrc indicates if the src will be removed or not. */ public void copyToLocalFile(boolean delSrc, Path src, Path dst) - throws IOException { + throws IOException { fs.copyToLocalFile(delSrc, src, dst); } @@ -258,7 +258,7 @@ * the FS is remote, we write into the tmp local area. */ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) - throws IOException { + throws IOException { return fs.startLocalOutput(fsOutputFile, tmpLocalFile); } @@ -269,7 +269,7 @@ * fsOutputFile. */ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) - throws IOException { + throws IOException { fs.completeLocalOutput(fsOutputFile, tmpLocalFile); } @@ -296,7 +296,7 @@ } @Override - public Configuration getConf() { + public Configuration getConf() { return fs.getConf(); } }