hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ap...@apache.org
Subject svn commit: r1430555 - in /hama/trunk/core/src: main/java/org/apache/hama/ main/java/org/apache/hama/bsp/ main/java/org/apache/hama/pipes/ test/java/org/apache/hama/bsp/
Date Tue, 08 Jan 2013 21:45:43 GMT
Author: apurv
Date: Tue Jan  8 21:45:42 2013
New Revision: 1430555

URL: http://svn.apache.org/viewvc?rev=1430555&view=rev
Log:
fixed partitioning

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Tue Jan  8 21:45:42 2013
@@ -60,25 +60,27 @@ public interface Constants {
   public static final String UTF8_ENCODING = "UTF-8";
 
   public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
-  
+
   public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
 
   public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
   
+  public static final String COMBINER_CLASS = "bsp.combiner.class";
+
   public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
 
   ////////////////////////////////////////
   // Task scheduler related constants
   // //////////////////////////////////////
-  
+
   public static final String TASK_ALLOCATOR_CLASS = "bsp.taskalloc.class";
-  
+
   // //////////////////////////////////////
   // Fault tolerance related constants
   // //////////////////////////////////////
 
   public static final String FAULT_TOLERANCE_FLAG = "bsp.ft.enabled";
-  
+
   public static final String FAULT_TOLERANCE_CLASS = "bsp.ft.class";
 
   // //////////////////////////////////////
@@ -92,6 +94,25 @@ public interface Constants {
   // By default checkpointing when enabled would checkpoint on every superstep
   public static final short DEFAULT_CHECKPOINT_INTERVAL = 1;
 
+  // /////////////////////////////////////////////
+  // Job configuration related parameters.
+  // /////////////////////////////////////////////
+  public static final String JOB_INPUT_DIR      = "bsp.input.dir";
+  public static final String JOB_PEERS_COUNT    = "bsp.peers.num";
+  public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class"; 
+  public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
+  
+
+  // /////////////////////////////////////////////
+  // Constants related to partitioning
+  // /////////////////////////////////////////////
+  public static final String RUNTIME_PARTITIONING_DIR = "bsp.partitioning.dir";
+  public static final String ENABLE_RUNTIME_PARTITIONING = "bsp.input.runtime.partitioning";
+  public static final String RUNTIME_PARTITIONING_CLASS = "bsp.input.partitioner.class";
+  public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
+  public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
+
+
   // /////////////////////////////////////
   // Constants for ZooKeeper
   // /////////////////////////////////////

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Jan  8 21:45:42 2013
@@ -47,7 +47,6 @@ public class BSPJob extends BSPJobContex
   private JobState state = JobState.DEFINE;
   private BSPJobClient jobClient;
   private RunningJob info;
-  private boolean isPartitioned = false; //TODO: If input is already partitioned init to
true
 
   public BSPJob() throws IOException {
     this(new HamaConfiguration());
@@ -130,13 +129,13 @@ public class BSPJob extends BSPJobContex
 
   public void setCombinerClass(Class<? extends Combiner<? extends Writable>>
cls) {
     ensureState(JobState.DEFINE);
-    conf.setClass(COMBINER_CLASS_ATTR, cls, Combiner.class);
+    conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
   }
 
   @SuppressWarnings("unchecked")
   public Class<? extends Combiner<? extends Writable>> getCombinerClass() {
     return (Class<? extends Combiner<? extends Writable>>) conf.getClass(
-        COMBINER_CLASS_ATTR, Combiner.class);
+        Constants.COMBINER_CLASS, Combiner.class);
   }
 
   public void setJar(String jar) {
@@ -223,44 +222,6 @@ public class BSPJob extends BSPJobContex
 
   public boolean waitForCompletion(boolean verbose) throws IOException,
       InterruptedException, ClassNotFoundException {
-    if (this.getConfiguration().get("bsp.input.partitioner.class") != null
-        && !isPartitioned) {
-      FileSystem fs = FileSystem.get(conf);
-      Path inputDir = new Path(conf.get("bsp.input.dir"));
-      if (fs.isFile(inputDir)) {
-        inputDir = inputDir.getParent();
-      }
-      Path partitionDir = new Path(inputDir + "/partitions");
-
-      if (fs.exists(partitionDir)) {
-        fs.delete(partitionDir, true);
-      }
-
-      HamaConfiguration conf = new HamaConfiguration();
-      conf.setInt("desired.num.of.tasks",
-          Integer.parseInt(this.getConfiguration().get("bsp.peers.num")));
-      if(this.getConfiguration().get("bsp.partitioning.dir") != null) {
-        conf.set("bsp.partitioning.dir", this.getConfiguration().get("bsp.partitioning.dir"));
-       }
-      BSPJob partitioningJob = new BSPJob(conf);
-      partitioningJob.setInputPath(new Path(this.getConfiguration().get(
-          "bsp.input.dir")));
-      partitioningJob.setInputFormat(this.getInputFormat().getClass());
-      partitioningJob.setInputKeyClass(this.getInputKeyClass());
-      partitioningJob.setInputValueClass(getInputValueClass());
-      partitioningJob.setOutputFormat(NullOutputFormat.class);
-      partitioningJob.setBspClass(PartitioningRunner.class);
-
-      isPartitioned = partitioningJob.waitForCompletion(true);
-      if (isPartitioned) {
-        if(conf.get("bsp.partitioning.dir") != null) {
-          this.setInputPath(new Path(conf.get("bsp.partitioning.dir")));
-        } else {
-          this.setInputPath(new Path(inputDir + "/partitions"));
-         }
-      }
-    }
-
     if (state == JobState.DEFINE) {
       submit();
     }
@@ -292,13 +253,13 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings({ "rawtypes" })
   public InputFormat getInputFormat() {
-    return ReflectionUtils.newInstance(conf.getClass("bsp.input.format.class",
+    return ReflectionUtils.newInstance(conf.getClass(Constants.INPUT_FORMAT_CLASS,
         TextInputFormat.class, InputFormat.class), conf);
   }
 
   @SuppressWarnings({ "rawtypes" })
   public void setInputFormat(Class<? extends InputFormat> cls) {
-    conf.setClass("bsp.input.format.class", cls, InputFormat.class);
+    conf.setClass(Constants.INPUT_FORMAT_CLASS, cls, InputFormat.class);
   }
 
   /**
@@ -406,7 +367,7 @@ public class BSPJob extends BSPJobContex
    */
   @SuppressWarnings("rawtypes")
   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
-    conf.setClass("bsp.output.format.class", theClass, OutputFormat.class);
+    conf.setClass(Constants.OUTPUT_FORMAT_CLASS, theClass, OutputFormat.class);
   }
 
   /**
@@ -414,19 +375,18 @@ public class BSPJob extends BSPJobContex
    */
   @SuppressWarnings("rawtypes")
   public void setPartitioner(Class<? extends Partitioner> theClass) {
-    conf.setClass("bsp.input.partitioner.class", theClass, Partitioner.class);
+    conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass, Partitioner.class);
   }
 
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
-    return ReflectionUtils.newInstance(conf
-        .getClass("bsp.input.partitioner.class", HashPartitioner.class,
-            Partitioner.class), conf);
+    return ReflectionUtils.newInstance(
+        conf.getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,Partitioner.class),
conf);
   }
 
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
-    return ReflectionUtils.newInstance(conf.getClass("bsp.output.format.class",
+    return ReflectionUtils.newInstance(conf.getClass(Constants.OUTPUT_FORMAT_CLASS,
         TextOutputFormat.class, OutputFormat.class), conf);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Jan  8 21:45:42
2013
@@ -288,7 +288,7 @@ public class BSPJobClient extends Config
    * @throws IOException
    */
   public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
-      IOException {
+  IOException {
     return submitJobInternal(job, jobSubmitClient.getNewJobId());
   }
 
@@ -323,6 +323,10 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+
+      job = partition(job, maxTasks);
+      maxTasks = job.getInt("hama.partition.count", maxTasks);
+
       job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
@@ -364,6 +368,79 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
+
+  protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+    
+    if(job.get("bsp.partitioning.runner.job") != null){return job;}//Early exit for the partitioner
job.
+    
+    InputSplit[] splits = job.getInputFormat().getSplits(job,
+        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+            : maxTasks);
+
+    String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
+    Path inputDir = new Path(inputPath);
+    if (fs.isFile(inputDir)) {
+      inputDir = inputDir.getParent();
+    }
+
+    if (inputPath != null) {
+      int numSplits = splits.length;
+      int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
+
+      if ((numTasks > 0 && numTasks != numSplits)
+          || (job.getConfiguration().getBoolean(
+              Constants.ENABLE_RUNTIME_PARTITIONING, false) && job
+              .getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS) != null)) {
+
+        Path partitionDir = new Path(inputDir + "/partitions");
+
+        if (fs.exists(partitionDir)) {
+          fs.delete(partitionDir, true);
+        }
+
+        HamaConfiguration conf = new HamaConfiguration();
+        conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT,
+            Integer.parseInt(job.getConfiguration().get("bsp.peers.num")));
+        if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
+          conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
+              .get(Constants.RUNTIME_PARTITIONING_DIR));
+        }
+        conf.set(Constants.RUNTIME_PARTITIONING_CLASS, job.get(Constants.RUNTIME_PARTITIONING_CLASS));
+        BSPJob partitioningJob = new BSPJob(conf);
+        partitioningJob.setInputPath(new Path(job.getConfiguration().get(
+            Constants.JOB_INPUT_DIR)));
+        partitioningJob.setInputFormat(job.getInputFormat().getClass());
+        partitioningJob.setInputKeyClass(job.getInputKeyClass());
+        partitioningJob.setInputValueClass(job.getInputValueClass());
+        partitioningJob.setOutputFormat(NullOutputFormat.class);
+        partitioningJob.setBspClass(PartitioningRunner.class);
+        partitioningJob.set("bsp.partitioning.runner.job", "true");
+
+        boolean isPartitioned = false;
+        try {
+          isPartitioned = partitioningJob.waitForCompletion(true);
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted partitioning run-time.", e);
+        } catch (ClassNotFoundException e) {
+          LOG.error("Class not found error partitioning run-time.", e);
+        }
+        if (isPartitioned) {
+          if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
+            job.setInputPath(new Path(conf
+                .get(Constants.RUNTIME_PARTITIONING_DIR)));
+          } else {
+            job.setInputPath(new Path(inputDir + "/partitions"));
+          }
+        } else {
+          LOG.error("Error partitioning the input path.");
+          throw new IOException("Runtime partition failed for the job.");
+        }
+      }
+    }
+    return job;
+  }
+
+
   protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
     int maxTasks;
     ClusterStatus clusterStatus = getClusterStatus(true);
@@ -551,9 +628,9 @@ public class BSPJobClient extends Config
     if (job.isSuccessful()) {
       LOG.info("The total number of supersteps: " + info.getSuperstepCount());
       info.getStatus()
-          .getCounter()
-          .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
-              info.getSuperstepCount());
+      .getCounter()
+      .incrCounter(JobInProgress.JobCounter.SUPERSTEPS,
+          info.getSuperstepCount());
       info.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
@@ -615,7 +692,7 @@ public class BSPJobClient extends Config
   }
 
   public static void runJob(BSPJob job) throws FileNotFoundException,
-      IOException {
+  IOException {
     BSPJobClient jc = new BSPJobClient(job.getConfiguration());
 
     if (job.getNumBspTask() == 0

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Tue Jan  8 21:45:42
2013
@@ -33,9 +33,6 @@ public class BSPJobContext {
   // Put all of the attribute names in here so that BSPJob and JobContext are
   // consistent.
   protected static final String WORK_CLASS_ATTR = "bsp.work.class";
-  protected static final String COMBINER_CLASS_ATTR = "bsp.combiner.class";
-  protected static final String INPUT_FORMAT_CLASS_ATTR = "bsp.inputformat.class";
-  protected static final String OUTPUT_FORMAT_CLASS_ATTR = "bsp.outputformat.class";
   protected static final String WORKING_DIR = "bsp.working.dir";
 
   protected final Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Jan  8 21:45:42
2013
@@ -183,7 +183,7 @@ public final class BSPPeerImpl<K1, V1, K
       LOG.debug("Initialized Messaging service.");
     }
 
-    final String combinerName = conf.get("bsp.combiner.class");
+    final String combinerName = conf.get(Constants.COMBINER_CLASS);
     if (combinerName != null) {
       combiner = (Combiner<M>) ReflectionUtils.newInstance(
           conf.getClassByName(combinerName), conf);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Tue Jan  8 21:45:42
2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
@@ -139,7 +140,7 @@ public class PartitioningRunner extends
   @SuppressWarnings("rawtypes")
   public Partitioner getPartitioner() {
     return ReflectionUtils.newInstance(conf
-        .getClass("bsp.input.partitioner.class", HashPartitioner.class,
+        .getClass(Constants.RUNTIME_PARTITIONING_CLASS, HashPartitioner.class,
             Partitioner.class), conf);
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/pipes/BinaryProtocol.java Tue Jan  8 21:45:42
2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
@@ -297,8 +298,8 @@ public class BinaryProtocol<K1 extends W
     }
 
     public void readKeyValue() throws IOException {
-      boolean nullinput = peer.getConfiguration().get("bsp.input.format.class") == null
-          || peer.getConfiguration().get("bsp.input.format.class")
+      boolean nullinput = peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS) == null
+          || peer.getConfiguration().get(Constants.INPUT_FORMAT_CLASS)
               .equals("org.apache.hama.bsp.NullInputFormat");
 
       if (!nullinput) {

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1430555&r1=1430554&r2=1430555&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Tue Jan  8 21:45:42
2013
@@ -43,6 +43,7 @@ public class TestPartitioning extends Te
     Configuration conf = new Configuration();
     conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
     conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest");
+    conf.setBoolean("bsp.input.runtime.partitioning", true);
     BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
     bsp.setJobName("Test partitioning with input");
     bsp.setBspClass(PartionedBSP.class);



Mime
View raw message