hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r1032497 - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/test/org/apache/hadoop/hdfs/ src/contrib/raid/src/test/org/apache/hadoop/raid/
Date Mon, 08 Nov 2010 08:47:22 GMT
Author: dhruba
Date: Mon Nov  8 08:47:21 2010
New Revision: 1032497

URL: http://svn.apache.org/viewvc?rev=1032497&view=rev
Log:
MAPREDUCE-2142.  Refactor RaidNode so that the map-reduce component is
clearly separated out. (Patrick Kling via dhruba)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/README
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Nov  8 08:47:21 2010
@@ -372,7 +372,10 @@ Trunk (unreleased changes)
     via schen)
 
     MAPREDUCE-2173.  Fix race condition in TestBlockFixer that was
-    causing  intermittent failure (Ramkumar Vadali via dhruba)
+    causing  intermittent failure (Patrick Kling via dhruba)
+
+    MAPREDUCE-2142.  Refactor RaidNode so that the map-reduce component is
+    clearly separated out. (Patrick Kling via dhruba)
 
 Release 0.21.1 - Unreleased
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/README
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/README?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/README (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/README Mon Nov  8 08:47:21 2010
@@ -104,14 +104,12 @@ The following properties can be set in h
           </description>
         </property>
 
-    Specify RaidNode to not use a map-reduce cluster for raiding files in parallel.
+    Specify which implementation of RaidNode to use.
         <property>
-          <name>fs.raidnode.local</name>
-          <value>true</value>
-          <description>If you do not want to use your map-reduce cluster to
-          raid files in parallel, then specify "true". By default, this
-          value is false, which means that the RaidNode uses the default
-          map-reduce cluster to generate parity blocks.
+          <name>raid.classname</name>
+          <value>org.apache.hadoop.raid.DistRaidNode</value>
+          <description>Specify which implementation of RaidNode to use
+	  (class name).
           </description>
         </property>
 

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java?rev=1032497&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaidNode.java
Mon Nov  8 08:47:21 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.util.Daemon;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+/**
+ * Implementation of {@link RaidNode} that uses map reduce jobs to raid files.
+ */
+public class DistRaidNode extends RaidNode {
+
+  public static final Log LOG = LogFactory.getLog(DistRaidNode.class);
+
+  /** Daemon thread to monitor raid job progress */
+  JobMonitor jobMonitor = null;
+  Daemon jobMonitorThread = null;
+
+  public DistRaidNode(Configuration conf) throws IOException {
+    super(conf);
+    this.jobMonitor = new JobMonitor(conf);
+    this.jobMonitorThread = new Daemon(this.jobMonitor);
+    this.jobMonitorThread.start();
+    
+    LOG.info("created");
+  }
+
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  public void join() {
+    super.join();
+    try {
+      if (jobMonitorThread != null) jobMonitorThread.join();
+    } catch (InterruptedException ie) {
+      // do nothing
+    }
+  }
+  
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  public void stop() {
+    if (stopRequested) {
+      return;
+    }
+    super.stop();
+    if (jobMonitor != null) jobMonitor.running = false;
+    if (jobMonitorThread != null) jobMonitorThread.interrupt();
+  }
+
+
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  void raidFiles(PolicyInfo info, List<FileStatus> paths) throws IOException {
+    // We already checked that no job for this policy is running
+    // So we can start a new job.
+    DistRaid dr = new DistRaid(conf);
+    //add paths for distributed raiding
+    dr.addRaidPaths(info, paths);
+    boolean started = dr.startDistRaid();
+    if (started) {
+      jobMonitor.monitorJob(info.getName(), dr);
+    }
+  }
+
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  int getRunningJobsForPolicy(String policyName) {
+    return jobMonitor.runningJobsCount(policyName);
+  }
+
+
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java Mon
Nov  8 08:47:21 2010
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Progressable;
 
@@ -226,7 +225,8 @@ public abstract class Encoder {
     LOG.info("Starting recovery by using source stripe " +
               srcFile + ":" + stripeStart);
     // Read the data from the blocks and write to the parity file.
-    encodeStripe(blocks, stripeStart, blockSize, outs, Reporter.NULL);
+    encodeStripe(blocks, stripeStart, blockSize, outs, 
+                 new RaidUtils.DummyProgressable());
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java?rev=1032497&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java
(added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/LocalRaidNode.java
Mon Nov  8 08:47:21 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+
+/**
+ * Implementation of {@link RaidNode} that performs raiding locally.
+ */
+public class LocalRaidNode extends RaidNode {
+
+  public static final Log LOG = LogFactory.getLog(LocalRaidNode.class);
+
+  public LocalRaidNode(Configuration conf) throws IOException {
+    super(conf);
+
+    LOG.info("created");
+  }
+
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  void raidFiles(PolicyInfo info, List<FileStatus> paths) throws IOException {
+    doRaid(conf, info, paths);
+  }
+
+  /**
+   * {@inheritDocs}
+   */
+  @Override
+  int getRunningJobsForPolicy(String policyName) {
+    return 0;
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Mon Nov  8 08:47:21 2010
@@ -36,6 +36,8 @@ import java.util.regex.Pattern;
 import java.lang.Thread;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 
 import org.xml.sax.SAXException;
 import javax.xml.parsers.ParserConfigurationException;
@@ -56,16 +58,18 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
 
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.raid.protocol.RaidProtocol;
 
 /**
- * A {@link RaidNode} that implements 
+ * A base class that implements {@link RaidProtocol}.
+ *
+ * use raid.classname to specify which implementation to use
  */
-public class RaidNode implements RaidProtocol {
+public abstract class RaidNode implements RaidProtocol {
 
   static{
     Configuration.addDefaultResource("hdfs-default.xml");
@@ -84,20 +88,21 @@ public class RaidNode implements RaidPro
   public static final String HAR_SUFFIX = "_raid.har";
   public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
     Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
-
   
+  public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";  
+
   /** RPC server */
   private Server server;
   /** RPC server address */
   private InetSocketAddress serverAddress = null;
   /** only used for testing purposes  */
-  private boolean stopRequested = false;
+  protected boolean stopRequested = false;
 
   /** Configuration Manager */
   private ConfigManager configMgr;
 
   /** hadoop configuration */
-  private Configuration conf;
+  protected Configuration conf;
 
   protected boolean initialized;  // Are we initialized?
   protected volatile boolean running; // Are we running?
@@ -116,13 +121,6 @@ public class RaidNode implements RaidPro
   BlockFixer blockFixer = null;
   Daemon blockFixerThread = null;
 
-  /** Daemon thread to monitor distributed raid job progress */
-  JobMonitor jobMonitor = null;
-  Daemon jobMonitorThread = null;
-
-  /** Do do distributed raiding */
-  boolean isRaidLocal = false;
-  
   // statistics about RAW hdfs blocks. This counts all replicas of a block.
   public static class Statistics {
     long numProcessedBlocks; // total blocks encountered in namespace
@@ -214,7 +212,6 @@ public class RaidNode implements RaidPro
       if (server != null) server.join();
       if (triggerThread != null) triggerThread.join();
       if (blockFixerThread != null) blockFixerThread.join();
-      if (jobMonitorThread != null) jobMonitorThread.join();
       if (purgeThread != null) purgeThread.join();
     } catch (InterruptedException ie) {
       // do nothing
@@ -234,8 +231,6 @@ public class RaidNode implements RaidPro
     if (triggerThread != null) triggerThread.interrupt();
     if (blockFixer != null) blockFixer.running = false;
     if (blockFixerThread != null) blockFixerThread.interrupt();
-    if (jobMonitor != null) jobMonitor.running = false;
-    if (jobMonitorThread != null) jobMonitorThread.interrupt();
     if (purgeThread != null) purgeThread.interrupt();
   }
 
@@ -262,7 +257,6 @@ public class RaidNode implements RaidPro
     InetSocketAddress socAddr = RaidNode.getAddress(conf);
     int handlerCount = conf.getInt("fs.raidnode.handler.count", 10);
 
-    isRaidLocal = conf.getBoolean("fs.raidnode.local", false);
     // read in the configuration
     configMgr = new ConfigManager(conf);
 
@@ -282,10 +276,6 @@ public class RaidNode implements RaidPro
     this.blockFixerThread = new Daemon(this.blockFixer);
     this.blockFixerThread.start();
 
-    this.jobMonitor = new JobMonitor(conf);
-    this.jobMonitorThread = new Daemon(this.jobMonitor);
-    this.jobMonitorThread.start();
-
     // start the deamon thread to fire polcies appropriately
     this.triggerThread = new Daemon(new TriggerMonitor());
     this.triggerThread.start();
@@ -331,6 +321,11 @@ public class RaidNode implements RaidPro
   }
 
   /**
+   * returns the number of raid jobs running for a particular policy
+   */
+  abstract int getRunningJobsForPolicy(String policyName);
+
+  /**
    * Periodically checks to see which policies should be fired.
    */
   class TriggerMonitor implements Runnable {
@@ -369,7 +364,7 @@ public class RaidNode implements RaidPro
     private boolean shouldSelectFiles(PolicyInfo info) {
       String policyName = info.getName();
       ScanState scanState = scanStateMap.get(policyName);
-      int runningJobsCount = jobMonitor.runningJobsCount(policyName);
+      int runningJobsCount = getRunningJobsForPolicy(policyName);
       // Is there a scan in progress for this policy?
       if (scanState.pendingTraversal != null) {
         int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
@@ -515,20 +510,7 @@ public class RaidNode implements RaidPro
           LOG.info("Triggering Policy Action " + info.getName() +
                    " " + info.getSrcPath());
           try {
-            if (isRaidLocal){
-              doRaid(conf, info, filteredPaths);
-            }
-            else{
-              // We already checked that no job for this policy is running
-              // So we can start a new job.
-              DistRaid dr = new DistRaid(conf);
-              //add paths for distributed raiding
-              dr.addRaidPaths(info, filteredPaths);
-              boolean started = dr.startDistRaid();
-              if (started) {
-                jobMonitor.monitorJob(info.getName(), dr);
-              }
-            }
+            raidFiles(info, filteredPaths);
           } catch (Exception e) {
             LOG.info("Exception while invoking action on policy " + info.getName() +
                      " srcPath " + info.getSrcPath() + 
@@ -548,6 +530,12 @@ public class RaidNode implements RaidPro
     }
   }
 
+  /**
+   * raid a list of files, this will be overridden by subclasses of RaidNode
+   */
+  abstract void raidFiles(PolicyInfo info, List<FileStatus> paths) 
+    throws IOException;
+
   static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
     return new Path(destPathPrefix, makeRelative(srcPath));
   }
@@ -659,8 +647,8 @@ public class RaidNode implements RaidPro
     int count = 0;
 
     for (FileStatus s : paths) {
-      doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl,
-             metaRepl, stripeLength);
+      doRaid(conf, s, destPref, statistics, new RaidUtils.DummyProgressable(),
+             doSimulate, targetRepl, metaRepl, stripeLength);
       if (count % 1000 == 0) {
         LOG.info("RAID statistics " + statistics.toString());
       }
@@ -675,7 +663,7 @@ public class RaidNode implements RaidPro
    */
 
   static public void doRaid(Configuration conf, PolicyInfo info,
-      FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
+      FileStatus src, Statistics statistics, Progressable reporter) throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
     int stripeLength = getStripeLength(conf);
@@ -692,7 +680,7 @@ public class RaidNode implements RaidPro
    * RAID an individual file
    */
   static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
-                      Statistics statistics, Reporter reporter, boolean doSimulate,
+                      Statistics statistics, Progressable reporter, boolean doSimulate,
                       int targetRepl, int metaRepl, int stripeLength) 
     throws IOException {
     Path p = stat.getPath();
@@ -749,7 +737,7 @@ public class RaidNode implements RaidPro
    * Create the parity file.
    */
   static private void generateParityFile(Configuration conf, FileStatus stat,
-                                  Reporter reporter,
+                                  Progressable reporter,
                                   FileSystem inFs,
                                   Path destPathPrefix, BlockLocation[] locations,
                                   int metaRepl, int stripeLength) throws IOException {
@@ -1273,10 +1261,36 @@ public class RaidNode implements RaidPro
   }
 
   /**
+   * Create an instance of the appropriate subclass of RaidNode 
+   */
+  public static RaidNode createRaidNode(Configuration conf)
+    throws ClassNotFoundException {
+    try {
+      // default to distributed raid node
+      Class<?> raidNodeClass =
+        conf.getClass(RAIDNODE_CLASSNAME_KEY, DistRaidNode.class);
+      if (!RaidNode.class.isAssignableFrom(raidNodeClass)) {
+        throw new ClassNotFoundException("not an implementation of RaidNode");
+      }
+      Constructor<?> constructor =
+        raidNodeClass.getConstructor(new Class[] {Configuration.class} );
+      return (RaidNode) constructor.newInstance(conf);
+    } catch (NoSuchMethodException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (InstantiationException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (IllegalAccessException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    } catch (InvocationTargetException e) {
+      throw new ClassNotFoundException("cannot construct blockfixer", e);
+    }
+  }
+
+  /**
    * Create an instance of the RaidNode 
    */
-  public static RaidNode createRaidNode(String argv[],
-                                        Configuration conf) throws IOException {
+  public static RaidNode createRaidNode(String argv[], Configuration conf)
+    throws IOException, ClassNotFoundException {
     if (conf == null) {
       conf = new Configuration();
     }
@@ -1286,7 +1300,7 @@ public class RaidNode implements RaidPro
       return null;
     }
     setStartupOption(conf, startOpt);
-    RaidNode node = new RaidNode(conf);
+    RaidNode node = createRaidNode(conf);
     return node;
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
Mon Nov  8 08:47:21 2010
@@ -31,9 +31,25 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Progressable;
 
 public class RaidUtils {
   /**
+   * A {@link Progressable} that does nothing.
+   *
+   * We could have used Reporter.NULL here but that would introduce
+   * a dependency on mapreduce.
+   */ 
+  public static class DummyProgressable implements Progressable {
+    /**
+     * Do nothing.
+     */
+    @Override
+    public void progress() {
+    }
+  }
+
+  /**
    * Removes files matching the trash file pattern.
    */
   public static void filterTrash(Configuration conf, List<Path> files) {

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
Mon Nov  8 08:47:21 2010
@@ -89,7 +89,8 @@ public class TestRaidDfs extends TestCas
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
 
     // do not use map-reduce cluster for Raiding
-    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+
     conf.set("raid.server.address", "localhost:0");
     conf.setInt("hdfs.raid.stripeLength", 3);
     conf.set("hdfs.raid.locations", "/destraid");

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
Mon Nov  8 08:47:21 2010
@@ -434,7 +434,7 @@ public class TestBlockFixer extends Test
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
 
     // do not use map-reduce cluster for Raiding
-    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
     conf.set("raid.server.address", "localhost:0");
     conf.setInt("hdfs.raid.stripeLength", stripeLength);
     conf.set("hdfs.raid.locs", "/destraid");

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
Mon Nov  8 08:47:21 2010
@@ -79,7 +79,11 @@ public class TestRaidHar extends TestCas
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
 
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
-    conf.setBoolean("fs.raidnode.local", local);
+    if (local) {
+      conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    } else {
+      conf.set("raid.classname", "org.apache.hadoop.raid.DistRaidNode");
+    }
 
     conf.set("raid.server.address", "localhost:0");
     conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
Mon Nov  8 08:47:21 2010
@@ -91,7 +91,11 @@ public class TestRaidNode extends TestCa
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
 
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
-    conf.setBoolean("fs.raidnode.local", local);
+    if (local) {
+      conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    } else {
+      conf.set("raid.classname", "org.apache.hadoop.raid.DistRaidNode");
+    }
 
     conf.set("raid.server.address", "localhost:0");
 
@@ -514,18 +518,22 @@ public class TestRaidNode extends TestCa
 
       long start = System.currentTimeMillis();
       final int MAX_WAITTIME = 300000;
-      while (cnode.jobMonitor.jobsMonitored() < 2 &&
+      
+      assertTrue("cnode is not DistRaidNode", cnode instanceof DistRaidNode);
+      DistRaidNode dcnode = (DistRaidNode) cnode;
+
+      while (dcnode.jobMonitor.jobsMonitored() < 2 &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(cnode.jobMonitor.jobsMonitored(), 2);
+      this.assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
 
       start = System.currentTimeMillis();
-      while (cnode.jobMonitor.jobsSucceeded() < 2 &&
+      while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(cnode.jobMonitor.jobsSucceeded(), 2);
+      this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
 
       LOG.info("Test testDistRaid successful.");
       
@@ -636,13 +644,16 @@ public class TestRaidNode extends TestCa
       long start = System.currentTimeMillis();
       final int MAX_WAITTIME = 300000;
 
+      assertTrue("cnode is not DistRaidNode", cnode instanceof DistRaidNode);
+      DistRaidNode dcnode = (DistRaidNode) cnode;
+
       start = System.currentTimeMillis();
-      while (cnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
+      while (dcnode.jobMonitor.jobsSucceeded() < numJobsExpected &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(cnode.jobMonitor.jobsMonitored(), numJobsExpected);
-      this.assertEquals(cnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+      this.assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
+      this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
 
       LOG.info("Test testSuspendTraversal successful.");
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Mon Nov  8 08:47:21 2010
@@ -90,7 +90,11 @@ public class TestRaidPurge extends TestC
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.dfs.DFSClient");
 
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
-    conf.setBoolean("fs.raidnode.local", local);
+    if (local) {
+      conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
+    } else {
+      conf.set("raid.classname", "org.apache.hadoop.raid.DistRaidNode");
+    }
 
     conf.set("raid.server.address", "localhost:0");
     

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java?rev=1032497&r1=1032496&r2=1032497&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
Mon Nov  8 08:47:21 2010
@@ -100,7 +100,7 @@ public class TestRaidShell extends TestC
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     localConf.setInt("raid.blockfix.interval", 1000);
     // the RaidNode does the raiding inline (instead of submitting to map/reduce)
-    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
     cnode = RaidNode.createRaidNode(null, localConf);
 
     try {
@@ -220,7 +220,7 @@ public class TestRaidShell extends TestC
     conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
 
     // do not use map-reduce cluster for Raiding
-    conf.setBoolean("fs.raidnode.local", true);
+    conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
     conf.set("raid.server.address", "localhost:0");
     conf.setInt("hdfs.raid.stripeLength", stripeLength);
     conf.set("hdfs.raid.locations", "/destraid");



Mime
View raw message