incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1195959 [1/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/testjar/ examples/src/main/java/org/apache/hama/exampl...
Date Tue, 01 Nov 2011 12:34:15 GMT
Author: edwardyoon
Date: Tue Nov  1 12:34:14 2011
New Revision: 1195959

URL: http://svn.apache.org/viewvc?rev=1195959&view=rev
Log:
Add input output system.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
    incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Nov  1 12:34:14 2011
@@ -4,6 +4,7 @@ Release 0.4 - Unreleased
 
   NEW FEATURES
   
+   HAMA-258: Add Input Output system (edwardyoon)
    HAMA-456: Add Message Combiner (edwardyoon)
    HAMA-456: Add getPeerName(int index) and getNumPeers() (edwardyoon)
    HAMA-431: MapReduce NG integration (tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Tue Nov  1 12:34:14 2011
@@ -25,43 +25,37 @@ import org.apache.zookeeper.KeeperExcept
 /**
  * This class provides an abstract implementation of the BSP interface.
  */
-public abstract class BSP implements BSPInterface {
+public abstract class BSP<K1, V1, K2, V2> implements BSPInterface<K1, V1, K2, V2> {
 
   protected Configuration conf;
 
   /**
-   * This method is called before the BSP method. It can be used for setup
-   * purposes.
+   * This method is your computation method, the main work of your BSP should be
+   * done here.
    * 
    * @param peer Your BSPPeer instance.
    */
-  public void setup(BSPPeer peer) throws IOException, KeeperException,
-      InterruptedException {
-
-  }
+  public abstract void bsp(BSPPeer peer, RecordReader<K1, V1> input,
+      OutputCollector<K2, V2> output) throws IOException, KeeperException,
+      InterruptedException;
 
   /**
-   * This method is called after the BSP method. It can be used for cleanup
-   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
-   * case of exceptions.
+   * This method is called before the BSP method. It can be used for setup
+   * purposes.
    * 
    * @param peer Your BSPPeer instance.
    */
-  public void cleanup(BSPPeer peer) {
-
-  }
+  public abstract void setup(BSPPeer peer) throws IOException, KeeperException,
+      InterruptedException;
 
   /**
-   * This method is your computation method, the main work of your BSP should be
-   * done here.
+   * This method is called after the BSP method. It can be used for cleanup
+   * purposes. Cleanup is guranteed to be called after the BSP runs, even in
+   * case of exceptions.
    * 
    * @param peer Your BSPPeer instance.
    */
-  @Override
-  public void bsp(BSPPeer peer) throws IOException, KeeperException,
-      InterruptedException {
-
-  }
+  public abstract void cleanup(BSPPeer peer);
 
   /**
    * Returns the configuration of this BSP Job.
@@ -85,5 +79,4 @@ public abstract class BSP implements BSP
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
-
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Tue Nov  1 12:34:14 2011
@@ -26,7 +26,7 @@ import org.apache.zookeeper.KeeperExcept
  * Interface BSP defines the basic operations needed to implement the BSP
  * algorithm.
  */
-public interface BSPInterface extends Configurable {
+public interface BSPInterface<K1, V1, K2, V2> extends Configurable {
 
   /**
    * A user defined function for programming in the BSP style.
@@ -39,6 +39,7 @@ public interface BSPInterface extends Co
    * @throws KeeperException
    * @throws InterruptedException
    */
-  public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+  public void bsp(BSPPeer bspPeer, RecordReader<K1, V1> input,
+      OutputCollector<K2, V2> output) throws IOException, KeeperException,
       InterruptedException;
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Nov  1 12:34:14 2011
@@ -23,6 +23,9 @@ import java.net.URLDecoder;
 import java.util.Enumeration;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.HamaConfiguration;
 
 /**
@@ -48,11 +51,11 @@ public class BSPJob extends BSPJobContex
     super(conf, null);
     jobClient = new BSPJobClient(conf);
   }
-  
+
   public BSPJob(HamaConfiguration conf, BSPJobID jobID) throws IOException {
     super(conf, jobID);
   }
-  
+
   public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
     this(conf);
     setJobName(jobName);
@@ -95,6 +98,7 @@ public class BSPJob extends BSPJobContex
    * @param cls
    * @throws IllegalStateException
    */
+  @SuppressWarnings("unchecked")
   public void setBspClass(Class<? extends BSP> cls)
       throws IllegalStateException {
     ensureState(JobState.DEFINE);
@@ -113,9 +117,10 @@ public class BSPJob extends BSPJobContex
 
   @SuppressWarnings("unchecked")
   public Class<? extends Combiner> getCombinerClass() {
-    return (Class<? extends Combiner>) conf.getClass(COMBINER_CLASS_ATTR, Combiner.class);
+    return (Class<? extends Combiner>) conf.getClass(COMBINER_CLASS_ATTR,
+        Combiner.class);
   }
-  
+
   public void setJar(String jar) {
     conf.set("bsp.jar", jar);
   }
@@ -155,10 +160,6 @@ public class BSPJob extends BSPJobContex
     conf.set("bsp.job.name", name);
   }
 
-  public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) {
-
-  }
-
   public void setUser(String user) {
     conf.set("user.name", user);
   }
@@ -214,9 +215,9 @@ public class BSPJob extends BSPJobContex
     }
     return isSuccessful();
   }
-  
+
   // for the testcase
-  BSPJobClient getJobClient(){
+  BSPJobClient getJobClient() {
     return jobClient;
   }
 
@@ -231,4 +232,64 @@ public class BSPJob extends BSPJobContex
   public int getNumBspTask() {
     return conf.getInt("bsp.peers.num", 0);
   }
+
+  @SuppressWarnings("unchecked")
+  public InputFormat getInputFormat() {
+    return (InputFormat) ReflectionUtils.newInstance(conf.getClass(
+        "bsp.input.format.class", TextInputFormat.class, InputFormat.class), conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void setInputFormat(Class<? extends InputFormat> cls) {
+    conf.setClass("bsp.input.format.class", cls, InputFormat.class);
+  }
+
+  /**
+   * Get the key class for the job output data.
+   * 
+   * @return the key class for the job output data.
+   */
+  public Class<?> getOutputKeyClass() {
+    return conf.getClass("bsp.output.key.class", LongWritable.class,
+        Object.class);
+  }
+
+  /**
+   * Set the key class for the job output data.
+   * 
+   * @param theClass the key class for the job output data.
+   */
+  public void setOutputKeyClass(Class<?> theClass) {
+    conf.setClass("bsp.output.key.class", theClass, Object.class);
+  }
+
+  /**
+   * Get the value class for job outputs.
+   * 
+   * @return the value class for job outputs.
+   */
+  public Class<?> getOutputValueClass() {
+    return conf.getClass("bsp.output.value.class", Text.class, Object.class);
+  }
+
+  /**
+   * Set the value class for job outputs.
+   * 
+   * @param theClass the value class for job outputs.
+   */
+  public void setOutputValueClass(Class<?> theClass) {
+    conf.setClass("bsp.output.value.class", theClass, Object.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void setOutputFormat(Class<? extends OutputFormat> theClass) {
+    conf.setClass("bsp.output.format.class", theClass, OutputFormat.class);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public OutputFormat getOutputFormat() {
+    return (OutputFormat) ReflectionUtils.newInstance(conf.getClass(
+        "bsp.output.format.class", TextOutputFormat.class, OutputFormat.class),
+        conf);
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Nov  1 12:34:14 2011
@@ -17,8 +17,13 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Random;
 
@@ -32,6 +37,11 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -200,8 +210,8 @@ public class BSPJobClient extends Config
     if (masterAdress != null && !masterAdress.equals("local")) {
       this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
           JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
-          BSPMaster.getAddress(conf), conf,
-          NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+          BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+              JobSubmissionProtocol.class));
     } else {
       LOG.debug("Using local BSP runner.");
       this.jobSubmitClient = new LocalBSPRunner(conf);
@@ -283,6 +293,7 @@ public class BSPJobClient extends Config
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
         + Integer.toString(Math.abs(r.nextInt()), 36));
+    Path submitSplitFile = new Path(submitJobDir, "job.split");
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitJobFile = new Path(submitJobDir, "job.xml");
     LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir);
@@ -298,15 +309,12 @@ public class BSPJobClient extends Config
     // check the number of BSP tasks
     int tasks = job.getNumBspTask();
     int maxTasks = clusterStatus.getMaxTasks();
-
     if (tasks <= 0 || tasks > maxTasks) {
-      LOG.info("The number of tasks you've entered was invalid. Using default value of "
-          + maxTasks + "!");
       job.setNumBspTask(maxTasks);
     }
 
-    // Create a number of filenames in the BSPMaster's fs namespace
     FileSystem fs = getFs();
+    // Create a number of filenames in the BSPMaster's fs namespace
     fs.delete(submitJobDir, true);
     submitJobDir = fs.makeQualified(submitJobDir);
     submitJobDir = new Path(submitJobDir.toUri().getPath());
@@ -315,6 +323,11 @@ public class BSPJobClient extends Config
     fs.mkdirs(submitJobDir);
     short replication = (short) job.getInt("bsp.submit.replication", 10);
 
+    // Create the splits for the job
+    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+    job.setNumBspTask(writeSplits(job, submitSplitFile));
+    job.set("bsp.job.split.file", submitSplitFile.toString());
+
     String originalJarPath = job.getJar();
 
     if (originalJarPath != null) { // copy jar to BSPMaster's fs
@@ -354,8 +367,8 @@ public class BSPJobClient extends Config
     //
     // Now, actually submit the job (using the submit name)
     //
-    JobStatus status = jobSubmitClient.submitJob(jobId,
-        submitJobFile.makeQualified(fs).toString());
+    JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile
+        .makeQualified(fs).toString());
     if (status != null) {
       return new NetworkedJob(status);
     } else {
@@ -363,6 +376,90 @@ public class BSPJobClient extends Config
     }
   }
 
+  private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException {
+    InputSplit[] splits = job.getInputFormat().getSplits(job,
+        job.getNumBspTask());
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new Comparator<InputSplit>() {
+      public int compare(InputSplit a, InputSplit b) {
+        try {
+          long left = a.getLength();
+          long right = b.getLength();
+          if (left == right) {
+            return 0;
+          } else if (left < right) {
+            return 1;
+          } else {
+            return -1;
+          }
+        } catch (IOException ie) {
+          throw new RuntimeException("Problem getting input split size", ie);
+        }
+      }
+    });
+    DataOutputStream out = writeSplitsFileHeader(job.getConf(),
+        submitSplitFile, splits.length);
+
+    try {
+      DataOutputBuffer buffer = new DataOutputBuffer();
+      RawSplit rawSplit = new RawSplit();
+      for (InputSplit split : splits) {
+        rawSplit.setClassName(split.getClass().getName());
+        buffer.reset();
+        split.write(buffer);
+        rawSplit.setDataLength(split.getLength());
+        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+        rawSplit.setLocations(split.getLocations());
+        rawSplit.write(out);
+      }
+    } finally {
+      out.close();
+    }
+    return splits.length;
+  }
+
+  private static final int CURRENT_SPLIT_FILE_VERSION = 0;
+  private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
+
+  private DataOutputStream writeSplitsFileHeader(Configuration conf,
+      Path filename, int length) throws IOException {
+    // write the splits to a file for the bsp master
+    FileSystem fs = filename.getFileSystem(conf);
+    FSDataOutputStream out = FileSystem.create(fs, filename, new FsPermission(
+        JOB_FILE_PERMISSION));
+    out.write(SPLIT_FILE_HEADER);
+    WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
+    WritableUtils.writeVInt(out, length);
+    return out;
+  }
+
+  /**
+   * Read a splits file into a list of raw splits
+   * 
+   * @param in the stream to read from
+   * @return the complete list of splits
+   * @throws IOException
+   */
+  static RawSplit[] readSplitFile(DataInput in) throws IOException {
+    byte[] header = new byte[SPLIT_FILE_HEADER.length];
+    in.readFully(header);
+    if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
+      throw new IOException("Invalid header on split file");
+    }
+    int vers = WritableUtils.readVInt(in);
+    if (vers != CURRENT_SPLIT_FILE_VERSION) {
+      throw new IOException("Unsupported split version " + vers);
+    }
+    int len = WritableUtils.readVInt(in);
+    RawSplit[] result = new RawSplit[len];
+    for (int i = 0; i < len; ++i) {
+      result[i] = new RawSplit();
+      result[i].readFields(in);
+    }
+    return result;
+  }
+
   /**
    * Monitor a job and print status in real-time as progress is made and tasks
    * fail.
@@ -583,9 +680,8 @@ public class BSPJobClient extends Config
         System.out.println("Job name: " + job.getJobName());
         System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
             + "\tFailed : 3\tPrep : 4\n");
-        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
-            jobStatus.getRunState(), jobStatus.getStartTime(),
-            jobStatus.getUsername());
+        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus
+            .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername());
 
         exitCode = 0;
       }
@@ -674,10 +770,76 @@ public class BSPJobClient extends Config
     }
   }
 
+  static class RawSplit implements Writable {
+    private String splitClass;
+    private BytesWritable bytes = new BytesWritable();
+    private String[] locations;
+    long dataLength;
+
+    public void setBytes(byte[] data, int offset, int length) {
+      bytes.set(data, offset, length);
+    }
+
+    public void setClassName(String className) {
+      splitClass = className;
+    }
+
+    public String getClassName() {
+      return splitClass;
+    }
+
+    public BytesWritable getBytes() {
+      return bytes;
+    }
+
+    public void clearBytes() {
+      bytes = null;
+    }
+
+    public void setLocations(String[] locations) {
+      this.locations = locations;
+    }
+
+    public String[] getLocations() {
+      return locations;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      splitClass = Text.readString(in);
+      dataLength = in.readLong();
+      bytes.readFields(in);
+      int len = WritableUtils.readVInt(in);
+      locations = new String[len];
+      for (int i = 0; i < len; ++i) {
+        locations[i] = Text.readString(in);
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, splitClass);
+      out.writeLong(dataLength);
+      bytes.write(out);
+      WritableUtils.writeVInt(out, locations.length);
+      for (int i = 0; i < locations.length; i++) {
+        Text.writeString(out, locations[i]);
+      }
+    }
+
+    public long getDataLength() {
+      return dataLength;
+    }
+
+    public void setDataLength(long l) {
+      dataLength = l;
+    }
+
+  }
+
   /**
    */
   public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(new BSPJobClient(), args);
     System.exit(res);
   }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Tue Nov  1 12:34:14 2011
@@ -56,7 +56,7 @@ public class BSPJobContext {
     return jobId;
   }
 
-  public Path getWorkingDirectory() throws IOException {
+  public Path getWorkingDirectory() {
     String name = conf.get(WORKING_DIR);
 
     if (name != null) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Nov  1 12:34:14 2011
@@ -17,10 +17,16 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.zookeeper.KeeperException;
@@ -33,16 +39,21 @@ public class BSPTask extends Task {
   public static final Log LOG = LogFactory.getLog(BSPTask.class);
 
   private BSPJob conf;
+  private BytesWritable split = new BytesWritable();
+  private String splitClass;
 
   public BSPTask() {
   }
 
   public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid,
-      int partition) {
+      int partition, String splitClass, BytesWritable split) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;
     this.partition = partition;
+
+    this.splitClass = splitClass;
+    this.split = split;
   }
 
   @Override
@@ -54,12 +65,62 @@ public class BSPTask extends Task {
   public void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical)
       throws IOException {
 
-    BSP bsp = (BSP) ReflectionUtils.newInstance(
-        job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+    try {
+      runBSP(job, bspPeer, split, umbilical);
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (ClassNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    done(umbilical);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <INK, INV, OUTK, OUTV> void runBSP(final BSPJob job, BSPPeerImpl bspPeer,
+      final BytesWritable rawSplit, final BSPPeerProtocol umbilical)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    InputSplit inputSplit = null;
+    // reinstantiate the split
+    try {
+      inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getConf()
+          .getClassByName(splitClass), job.getConf());
+    } catch (ClassNotFoundException exp) {
+      IOException wrap = new IOException("Split class " + splitClass
+          + " not found");
+      wrap.initCause(exp);
+      throw wrap;
+    }
+
+    DataInputBuffer splitBuffer = new DataInputBuffer();
+    splitBuffer.reset(split.getBytes(), 0, split.getLength());
+    inputSplit.readFields(splitBuffer);
+
+    RecordReader<INK, INV> in = job.getInputFormat().getRecordReader(
+        inputSplit, job);
+    
+    FileSystem fs = FileSystem.get(job.getConf());
+    String finalName = getOutputName(getPartition());
+    
+    final RecordWriter<OUTK, OUTV> out = 
+      job.getOutputFormat().getRecordWriter(fs, job, finalName);
+    
+    OutputCollector<OUTK,OUTV> collector = 
+      new OutputCollector<OUTK,OUTV>() {
+        public void collect(OUTK key, OUTV value)
+          throws IOException {
+          out.write(key, value);
+        }
+      };
+      
+    BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass(
+        "bsp.work.class", BSP.class), job.getConf());
 
     try {
       bsp.setup(bspPeer);
-      bsp.bsp(bspPeer);
+      bsp.bsp(bspPeer, in, collector);
     } catch (IOException e) {
       LOG.error("Exception during BSP execution!", e);
     } catch (KeeperException e) {
@@ -68,9 +129,9 @@ public class BSPTask extends Task {
       LOG.error("Exception during BSP execution!", e);
     } finally {
       bsp.cleanup(bspPeer);
+      out.close();
     }
 
-    done(umbilical);
   }
 
   public BSPJob getConf() {
@@ -81,4 +142,19 @@ public class BSPTask extends Task {
     this.conf = conf;
   }
 
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, splitClass);
+    split.write(out);
+    split = null;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    splitClass = Text.readString(in);
+    split.readFields(in);
+  }
+
 }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,557 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
+  public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+
+  private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+  private long minSplitSize = 1;
+  private static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  protected void setMinSplitSize(long minSplitSize) {
+    this.minSplitSize = minSplitSize;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * @param fs the file system that the file is on
+   * @param filename the file name to check
+   * @return is this file splitable?
+   */
+  protected boolean isSplitable(FileSystem fs, Path filename) {
+    return true;
+  }
+
+  public abstract RecordReader<K, V> getRecordReader(InputSplit split,
+      BSPJob job) throws IOException;
+
+  /**
+   * Set a PathFilter to be applied to the input paths for the map-reduce job.
+   * 
+   * @param filter the PathFilter class use for filtering the input paths.
+   */
+  public static void setInputPathFilter(BSPJob conf,
+      Class<? extends PathFilter> filter) {
+    conf.getConf().setClass("bsp.input.pathFilter.class", filter,
+        PathFilter.class);
+  }
+
+  /**
+   * Get a PathFilter instance of the filter set for the input paths.
+   * 
+   * @return the PathFilter instance set for the job, NULL if none has been set.
+   */
+  public static PathFilter getInputPathFilter(BSPJob conf) {
+    Class<? extends PathFilter> filterClass = conf.getConf().getClass(
+        "bsp.input.pathFilter.class", null, PathFilter.class);
+    return (filterClass != null) ? ReflectionUtils.newInstance(filterClass,
+        conf.getConf()) : null;
+  }
+
+  /**
+   * List input directories. Subclasses may override to, e.g., select only files
+   * matching a regular expression.
+   * 
+   * @param job the job to list input paths for
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected FileStatus[] listStatus(BSPJob job) throws IOException {
+    Path[] dirs = getInputPaths(job);
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+    PathFilter jobFilter = getInputPathFilter(job);
+    if (jobFilter != null) {
+      filters.add(jobFilter);
+    }
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (Path p : dirs) {
+      FileSystem fs = p.getFileSystem(job.getConf());
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDir()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+  /**
+   * Splits files returned by {@link #listStatus(JobConf)} when they're too big.
+   */
+  public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
+    FileStatus[] files = listStatus(job);
+
+    long totalSize = 0; // compute total size
+    for (FileStatus file : files) { // check we have valid files
+      if (file.isDir()) {
+        throw new IOException("Not a file: " + file.getPath());
+      }
+      totalSize += file.getLen();
+    }
+
+    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+    long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
+        minSplitSize);
+
+    // generate splits
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    NetworkTopology clusterMap = new NetworkTopology();
+    for (FileStatus file : files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job.getConf());
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(fs, path)) {
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+          String[] splitHosts = getSplitHosts(blkLocations, length
+              - bytesRemaining, splitSize, clusterMap);
+          splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+              splitHosts));
+          bytesRemaining -= splitSize;
+        }
+
+        if (bytesRemaining != 0) {
+          splits
+              .add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
+                  blkLocations[blkLocations.length - 1].getHosts()));
+        }
+      } else if (length != 0) {
+        String[] splitHosts = getSplitHosts(blkLocations, 0, length, clusterMap);
+        splits.add(new FileSplit(path, 0, length, splitHosts));
+      } else {
+        // Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    LOG.info("Total # of splits: " + splits.size());
+    return splits.toArray(new FileSplit[splits.size()]);
+  }
+
+  protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
+    return Math.max(minSize, Math.min(goalSize, blockSize));
+  }
+
+  protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset)
+          && (offset < blkLocations[i].getOffset()
+              + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset
+        + " is outside of file (0.." + fileLength + ")");
+  }
+
+  /**
+   * Sets the given comma separated paths as the list of inputs for the
+   * map-reduce job.
+   * 
+   * @param conf Configuration of the job
+   * @param commaSeparatedPaths Comma separated paths to be set as the list of
+   *          inputs for the map-reduce job.
+   */
+  public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) {
+    setInputPaths(conf, StringUtils
+        .stringToPath(getPathStrings(commaSeparatedPaths)));
+  }
+
+  /**
+   * Add the given comma separated paths to the list of inputs for the
+   * map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param commaSeparatedPaths Comma separated paths to be added to the list of
+   *          inputs for the map-reduce job.
+   */
+  public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) {
+    for (String str : getPathStrings(commaSeparatedPaths)) {
+      addInputPath(conf, new Path(str));
+    }
+  }
+
+  /**
+   * Set the array of {@link Path}s as the list of inputs for the map-reduce
+   * job.
+   * 
+   * @param conf Configuration of the job.
+   * @param inputPaths the {@link Path}s of the input directories/files for the
+   *          map-reduce job.
+   */
+  public static void setInputPaths(BSPJob conf, Path... inputPaths) {
+    Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
+    StringBuffer str = new StringBuffer(StringUtils.escapeString(path
+        .toString()));
+    for (int i = 1; i < inputPaths.length; i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
+      str.append(StringUtils.escapeString(path.toString()));
+    }
+    conf.set("bsp.input.dir", str.toString());
+  }
+
+  /**
+   * Add a {@link Path} to the list of inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the
+   *          map-reduce job.
+   */
+  public static void addInputPath(BSPJob conf, Path path) {
+    path = new Path(conf.getWorkingDirectory(), path);
+    String dirStr = StringUtils.escapeString(path.toString());
+    String dirs = conf.get("bsp.input.dir");
+    conf.set("bsp.input.dir", dirs == null ? dirStr : dirs
+        + StringUtils.COMMA_STR + dirStr);
+  }
+
+  // This method escapes commas in the glob pattern of the given paths.
+  private static String[] getPathStrings(String commaSeparatedPaths) {
+    int length = commaSeparatedPaths.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+
+    for (int i = 0; i < length; i++) {
+      char ch = commaSeparatedPaths.charAt(i);
+      switch (ch) {
+        case '{': {
+          curlyOpen++;
+          if (!globPattern) {
+            globPattern = true;
+          }
+          break;
+        }
+        case '}': {
+          curlyOpen--;
+          if (curlyOpen == 0 && globPattern) {
+            globPattern = false;
+          }
+          break;
+        }
+        case ',': {
+          if (!globPattern) {
+            pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+            pathStart = i + 1;
+          }
+          break;
+        }
+      }
+    }
+    pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+
+    return pathStrings.toArray(new String[0]);
+  }
+
+  /**
+   * Get the list of input {@link Path}s for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @return the list of input {@link Path}s for the map-reduce job.
+   */
+  public static Path[] getInputPaths(BSPJob conf) {
+    String dirs = conf.getConf().get("bsp.input.dir", "");
+    String[] list = StringUtils.split(dirs);
+    Path[] result = new Path[list.length];
+    for (int i = 0; i < list.length; i++) {
+      result[i] = new Path(StringUtils.unEscapeString(list[i]));
+    }
+    return result;
+  }
+
+  private void sortInDescendingOrder(List<NodeInfo> mylist) {
+    Collections.sort(mylist, new Comparator<NodeInfo>() {
+      public int compare(NodeInfo obj1, NodeInfo obj2) {
+
+        if (obj1 == null || obj2 == null)
+          return -1;
+
+        if (obj1.getValue() == obj2.getValue()) {
+          return 0;
+        } else {
+          return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
+        }
+      }
+    });
+  }
+
+  /**
+   * This function identifies and returns the hosts that contribute most for a
+   * given split. For calculating the contribution, rack locality is treated on
+   * par with host locality, so hosts from racks that contribute the most are
+   * preferred over hosts on racks that contribute less
+   * 
+   * @param blkLocations The list of block locations
+   * @param offset
+   * @param splitSize
+   * @return array of hosts that contribute most to this split
+   * @throws IOException
+   */
+  protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset,
+      long splitSize, NetworkTopology clusterMap) throws IOException {
+
+    int startIndex = getBlockIndex(blkLocations, offset);
+
+    long bytesInThisBlock = blkLocations[startIndex].getOffset()
+        + blkLocations[startIndex].getLength() - offset;
+
+    // If this is the only block, just return
+    if (bytesInThisBlock >= splitSize) {
+      return blkLocations[startIndex].getHosts();
+    }
+
+    long bytesInFirstBlock = bytesInThisBlock;
+    int index = startIndex + 1;
+    splitSize -= bytesInThisBlock;
+
+    while (splitSize > 0) {
+      bytesInThisBlock = Math.min(splitSize, blkLocations[index++].getLength());
+      splitSize -= bytesInThisBlock;
+    }
+
+    long bytesInLastBlock = bytesInThisBlock;
+    int endIndex = index - 1;
+
+    Map<Node, NodeInfo> hostsMap = new IdentityHashMap<Node, NodeInfo>();
+    Map<Node, NodeInfo> racksMap = new IdentityHashMap<Node, NodeInfo>();
+    String[] allTopos = new String[0];
+
+    // Build the hierarchy and aggregate the contribution of
+    // bytes at each level. See TestGetSplitHosts.java
+
+    for (index = startIndex; index <= endIndex; index++) {
+
+      // Establish the bytes in this block
+      if (index == startIndex) {
+        bytesInThisBlock = bytesInFirstBlock;
+      } else if (index == endIndex) {
+        bytesInThisBlock = bytesInLastBlock;
+      } else {
+        bytesInThisBlock = blkLocations[index].getLength();
+      }
+
+      allTopos = blkLocations[index].getTopologyPaths();
+
+      // If no topology information is available, just
+      // prefix a fakeRack
+      if (allTopos.length == 0) {
+        allTopos = fakeRacks(blkLocations, index);
+      }
+
+      // NOTE: This code currently works only for one level of
+      // hierarchy (rack/host). However, it is relatively easy
+      // to extend this to support aggregation at different
+      // levels
+
+      for (String topo : allTopos) {
+
+        Node node, parentNode;
+        NodeInfo nodeInfo, parentNodeInfo;
+
+        node = clusterMap.getNode(topo);
+
+        if (node == null) {
+          node = new NodeBase(topo);
+          clusterMap.add(node);
+        }
+
+        nodeInfo = hostsMap.get(node);
+
+        if (nodeInfo == null) {
+          nodeInfo = new NodeInfo(node);
+          hostsMap.put(node, nodeInfo);
+          parentNode = node.getParent();
+          parentNodeInfo = racksMap.get(parentNode);
+          if (parentNodeInfo == null) {
+            parentNodeInfo = new NodeInfo(parentNode);
+            racksMap.put(parentNode, parentNodeInfo);
+          }
+          parentNodeInfo.addLeaf(nodeInfo);
+        } else {
+          nodeInfo = hostsMap.get(node);
+          parentNode = node.getParent();
+          parentNodeInfo = racksMap.get(parentNode);
+        }
+
+        nodeInfo.addValue(index, bytesInThisBlock);
+        parentNodeInfo.addValue(index, bytesInThisBlock);
+
+      } // for all topos
+
+    } // for all indices
+
+    return identifyHosts(allTopos.length, racksMap);
+  }
+
+  private String[] identifyHosts(int replicationFactor,
+      Map<Node, NodeInfo> racksMap) {
+
+    String[] retVal = new String[replicationFactor];
+
+    List<NodeInfo> rackList = new LinkedList<NodeInfo>();
+
+    rackList.addAll(racksMap.values());
+
+    // Sort the racks based on their contribution to this split
+    sortInDescendingOrder(rackList);
+
+    boolean done = false;
+    int index = 0;
+
+    // Get the host list for all our aggregated items, sort
+    // them and return the top entries
+    for (NodeInfo ni : rackList) {
+
+      Set<NodeInfo> hostSet = ni.getLeaves();
+
+      List<NodeInfo> hostList = new LinkedList<NodeInfo>();
+      hostList.addAll(hostSet);
+
+      // Sort the hosts in this rack based on their contribution
+      sortInDescendingOrder(hostList);
+
+      for (NodeInfo host : hostList) {
+        // Strip out the port number from the host name
+        retVal[index++] = host.node.getName().split(":")[0];
+        if (index == replicationFactor) {
+          done = true;
+          break;
+        }
+      }
+
+      if (done == true) {
+        break;
+      }
+    }
+    return retVal;
+  }
+
+  private String[] fakeRacks(BlockLocation[] blkLocations, int index)
+      throws IOException {
+    String[] allHosts = blkLocations[index].getHosts();
+    String[] allTopos = new String[allHosts.length];
+    for (int i = 0; i < allHosts.length; i++) {
+      allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
+    }
+    return allTopos;
+  }
+
+  private static class NodeInfo {
+    final Node node;
+    final Set<Integer> blockIds;
+    final Set<NodeInfo> leaves;
+
+    private long value;
+
+    NodeInfo(Node node) {
+      this.node = node;
+      blockIds = new HashSet<Integer>();
+      leaves = new HashSet<NodeInfo>();
+    }
+
+    long getValue() {
+      return value;
+    }
+
+    void addValue(int blockIndex, long value) {
+      if (blockIds.add(blockIndex) == true) {
+        this.value += value;
+      }
+    }
+
+    Set<NodeInfo> getLeaves() {
+      return leaves;
+    }
+
+    void addLeaf(NodeInfo nodeInfo) {
+      leaves.add(nodeInfo);
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,198 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+
+public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
+
+  /**
+   * Set whether the output of the job is compressed.
+   * 
+   * @param conf the {@link JobConf} to modify
+   * @param compress should the output of the job be compressed?
+   */
+  public static void setCompressOutput(BSPJob conf, boolean compress) {
+    conf.getConf().setBoolean("bsp.output.compress", compress);
+  }
+
+  /**
+   * Is the job output compressed?
+   * 
+   * @param conf the {@link JobConf} to look in
+   * @return <code>true</code> if the job output should be compressed,
+   *         <code>false</code> otherwise
+   */
+  public static boolean getCompressOutput(BSPJob conf) {
+    return conf.getConf().getBoolean("bsp.output.compress", false);
+  }
+
+  /**
+   * Set the {@link CompressionCodec} to be used to compress job outputs.
+   * 
+   * @param conf the {@link JobConf} to modify
+   * @param codecClass the {@link CompressionCodec} to be used to compress the
+   *          job outputs
+   */
+  public static void setOutputCompressorClass(BSPJob conf,
+      Class<? extends CompressionCodec> codecClass) {
+    setCompressOutput(conf, true);
+    conf.getConf().setClass("bsp.output.compression.codec", codecClass,
+        CompressionCodec.class);
+  }
+
+  /**
+   * Get the {@link CompressionCodec} for compressing the job outputs.
+   * 
+   * @param conf the {@link JobConf} to look in
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} to be used to compress the job outputs
+   * @throws IllegalArgumentException if the class was specified, but not found
+   */
+  public static Class<? extends CompressionCodec> getOutputCompressorClass(
+      BSPJob conf, Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+
+    String name = conf.get("bsp.output.compression.codec");
+    if (name != null) {
+      try {
+        codecClass = conf.getConf().getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+
+  public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,
+      BSPJob job, String name) throws IOException;
+
+  public void checkOutputSpecs(FileSystem ignored, BSPJob job)
+      throws FileAlreadyExistsException, InvalidJobConfException, IOException {
+    // Ensure that the output directory is set and not already there
+    Path outDir = getOutputPath(job);
+    if (outDir == null && job.getNumBspTask() != 0) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
+    }
+    if (outDir != null) {
+      FileSystem fs = outDir.getFileSystem(job.getConf());
+      // normalize the output directory
+      outDir = fs.makeQualified(outDir);
+      setOutputPath(job, outDir);
+      // check its existence
+      if (fs.exists(outDir)) {
+        throw new FileAlreadyExistsException("Output directory " + outDir
+            + " already exists");
+      }
+    }
+  }
+
+  /**
+   * Set the {@link Path} of the output directory for the map-reduce job.
+   * 
+   * @param conf The configuration of the job.
+   * @param outputDir the {@link Path} of the output directory for the
+   *          map-reduce job.
+   */
+  public static void setOutputPath(BSPJob conf, Path outputDir) {
+    outputDir = new Path(conf.getWorkingDirectory(), outputDir);
+    conf.set("bsp.output.dir", outputDir.toString());
+  }
+
+  /**
+   * Set the {@link Path} of the task's temporary output directory for the
+   * map-reduce job.
+   * 
+   * <p>
+   * <i>Note</i>: Task output path is set by the framework.
+   * </p>
+   * 
+   * @param conf The configuration of the job.
+   * @param outputDir the {@link Path} of the output directory for the
+   *          map-reduce job.
+   */
+
+  static void setWorkOutputPath(BSPJob conf, Path outputDir) {
+    outputDir = new Path(conf.getWorkingDirectory(), outputDir);
+    conf.set("bsp.work.output.dir", outputDir.toString());
+  }
+
+  /**
+   * Get the {@link Path} to the output directory for the map-reduce job.
+   * 
+   * @return the {@link Path} to the output directory for the map-reduce job.
+   * @see FileOutputFormat#getWorkOutputPath(JobConf)
+   */
+  public static Path getOutputPath(BSPJob conf) {
+    String name = conf.get("bsp.output.dir");
+    return name == null ? null : new Path(name);
+  }
+
+  public static Path getWorkOutputPath(BSPJob conf) {
+    String name = conf.get("bsp.work.output.dir");
+    return name == null ? null : new Path(name);
+  }
+
+  /**
+   * Helper function to create the task's temporary output directory and return
+   * the path to the task's output file.
+   * 
+   * @param conf job-configuration
+   * @param name temporary task-output filename
+   * @return path to the task's temporary output file
+   * @throws IOException
+   */
+  public static Path getTaskOutputPath(BSPJob conf, String name)
+      throws IOException {
+    // ${bsp.out.dir}
+    Path outputPath = getOutputPath(conf);
+    if (outputPath == null) {
+      throw new IOException("Undefined job output-path");
+    }
+
+    Path workPath = outputPath;
+
+    // ${bsp.out.dir}/_temporary/_${taskid}/${name}
+    return new Path(workPath, name);
+  }
+
+  /**
+   * Helper function to generate a name that is unique for the task.
+   * 
+   * @param conf the configuration for the job.
+   * @param name the name to make unique.
+   * @return a unique name accross all tasks of the job.
+   */
+  public static String getUniqueName(BSPJob conf, String name) {
+    int partition = conf.getInt("bsp.task.partition", -1);
+    if (partition == -1) {
+      throw new IllegalArgumentException(
+          "This method can only be called from within a Job");
+    }
+
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+
+    return name + "-" + numberFormat.format(partition);
+  }
+
+  /**
+   * Helper function to generate a {@link Path} for a file that is unique for
+   * the task within the job output directory.
+   * 
+   * @param conf the configuration for the job.
+   * @param name the name for the file.
+   * @return a unique path accross all tasks of the job.
+   */
+  public static Path getPathForCustomFile(BSPJob conf, String name) {
+    return new Path(getWorkOutputPath(conf), getUniqueName(conf, name));
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,89 @@
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class FileSplit implements InputSplit {
+  private Path file;
+  private long start;
+  private long length;
+  private String[] hosts;
+
+  FileSplit() {
+  }
+
+  /**
+   * Constructs a split.
+   * 
+   * @deprecated
+   * @param file the file name
+   * @param start the position of the first byte in the file to process
+   * @param length the number of bytes in the file to process
+   */
+  public FileSplit(Path file, long start, long length, BSPJob conf) {
+    this(file, start, length, (String[]) null);
+  }
+
+  /**
+   * Constructs a split with host information
+   * 
+   * @param file the file name
+   * @param start the position of the first byte in the file to process
+   * @param length the number of bytes in the file to process
+   * @param hosts the list of hosts containing the block, possibly null
+   */
+  public FileSplit(Path file, long start, long length, String[] hosts) {
+    this.file = file;
+    this.start = start;
+    this.length = length;
+    this.hosts = hosts;
+  }
+
+  /** The file containing this split's data. */
+  public Path getPath() {
+    return file;
+  }
+
+  /** The position of the first byte in the file to process. */
+  public long getStart() {
+    return start;
+  }
+
+  /** The number of bytes in the file to process. */
+  public long getLength() {
+    return length;
+  }
+
+  public String toString() {
+    return file + ":" + start + "+" + length;
+  }
+
+  // //////////////////////////////////////////
+  // Writable methods
+  // //////////////////////////////////////////
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, file.toString());
+    out.writeLong(start);
+    out.writeLong(length);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    file = new Path(Text.readString(in));
+    start = in.readLong();
+    length = in.readLong();
+    hosts = null;
+  }
+
+  public String[] getLocations() throws IOException {
+    if (this.hosts == null) {
+      return new String[] {};
+    } else {
+      return this.hosts;
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,11 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+public interface InputFormat<K, V> {
+
+  InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException;
+
+  RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job) throws IOException;
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,25 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public interface InputSplit extends Writable {
+
+  /**
+   * Get the total number of bytes in the data of the <code>InputSplit</code>.
+   * 
+   * @return the number of bytes in the input split.
+   * @throws IOException
+   */
+  long getLength() throws IOException;
+  
+  /**
+   * Get the list of hostnames where the input split is located.
+   * 
+   * @return list of hostnames where data of the <code>InputSplit</code> is
+   *         located as an array of <code>String</code>s.
+   * @throws IOException
+   */
+  String[] getLocations() throws IOException;
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue Nov  1 12:34:14 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -72,6 +73,7 @@ class JobInProgress {
 
   int numBSPTasks = 0;
   int clusterSize;
+  String jobSplit;
 
   public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
       Configuration conf) throws IOException {
@@ -80,9 +82,9 @@ class JobInProgress {
     this.localFs = FileSystem.getLocal(conf);
     this.jobFile = jobFile;
     this.master = master;
-    
-    this.status = new JobStatus(jobId, null, 0L, 0L,
-        JobStatus.State.PREP.value());
+
+    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
+        .value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -96,10 +98,10 @@ class JobInProgress {
     FileSystem fs = jobDir.getFileSystem(conf);
     fs.copyToLocalFile(jobFile, localJobFile);
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
-    this.numBSPTasks = job.getNumBspTask();
+    this.jobSplit = job.getConf().get("bsp.job.split.file");
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
-        job.getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+        .getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -134,9 +136,9 @@ class JobInProgress {
   }
 
   public int getNumOfTasks() {
-    return tasks.length;  
+    return tasks.length;
   }
-  
+
   /**
    * @return the number of desired tasks.
    */
@@ -181,25 +183,34 @@ class JobInProgress {
       return;
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("numBSPTasks: " + numBSPTasks);
+    Path sysDir = new Path(this.master.getSystemDir());
+    FileSystem fs = sysDir.getFileSystem(conf);
+    DataInputStream splitFile = fs.open(new Path(this.jobSplit));
+
+    BSPJobClient.RawSplit[] splits;
+    try {
+      splits = BSPJobClient.readSplitFile(splitFile);
+    } finally {
+      splitFile.close();
     }
+    numBSPTasks = splits.length;
+    LOG.info("num BSPTasks: " + numBSPTasks);
 
     // adjust number of BSP tasks to actual number of splits
     this.tasks = new TaskInProgress[numBSPTasks];
     for (int i = 0; i < numBSPTasks; i++) {
       tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
-          this.master, this.conf, this, i);
+          splits[i], this.master, this.conf, this, i);
     }
 
     // Update job status
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
         0L, 0L, JobStatus.RUNNING);
 
-    // delete all nodes before start 
+    // delete all nodes before start
     master.clearZKNodes();
     master.createJobRoot(this.getJobID().toString());
-    
+
     tasksInited = true;
     LOG.info("Job is initialized.");
   }
@@ -247,17 +258,17 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), superstepCounter, superstepCounter,
-          superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), superstepCounter, superstepCounter, superstepCounter,
+          JobStatus.SUCCEEDED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
       LOG.info("Job successfully done.");
-      
+
       // delete job root
       master.deleteJobRoot(this.getJobID().toString());
-      
+
       garbageCollect();
     }
   }
@@ -281,9 +292,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), superstepCounter, superstepCounter,
-          superstepCounter, JobStatus.FAILED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), superstepCounter, superstepCounter, superstepCounter,
+          JobStatus.FAILED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,150 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+public class LineRecordReader implements RecordReader<LongWritable, Text> {
+  private static final Log LOG = LogFactory.getLog(LineRecordReader.class
+      .getName());
+
+  private CompressionCodecFactory compressionCodecs = null;
+  private long start;
+  private long pos;
+  private long end;
+  private LineReader in;
+  int maxLineLength;
+
+  /**
+   * A class that provides a line reader from an input stream.
+   */
+  public static class LineReader extends org.apache.hadoop.util.LineReader {
+    LineReader(InputStream in) {
+      super(in);
+    }
+
+    LineReader(InputStream in, int bufferSize) {
+      super(in, bufferSize);
+    }
+
+    public LineReader(InputStream in, Configuration conf) throws IOException {
+      super(in, conf);
+    }
+  }
+
+  public LineRecordReader(Configuration job, FileSplit split)
+      throws IOException {
+    this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
+        Integer.MAX_VALUE);
+    start = split.getStart();
+    end = start + split.getLength();
+    final Path file = split.getPath();
+    compressionCodecs = new CompressionCodecFactory(job);
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+    // open the file and seek to the start of the split
+    FileSystem fs = file.getFileSystem(job);
+    FSDataInputStream fileIn = fs.open(split.getPath());
+    boolean skipFirstLine = false;
+    if (codec != null) {
+      in = new LineReader(codec.createInputStream(fileIn), job);
+      end = Long.MAX_VALUE;
+    } else {
+      if (start != 0) {
+        skipFirstLine = true;
+        --start;
+        fileIn.seek(start);
+      }
+      in = new LineReader(fileIn, job);
+    }
+    if (skipFirstLine) { // skip first line and re-establish "start".
+      start += in.readLine(new Text(), 0, (int) Math.min(
+          (long) Integer.MAX_VALUE, end - start));
+    }
+    this.pos = start;
+  }
+
+  public LineRecordReader(InputStream in, long offset, long endOffset,
+      int maxLineLength) {
+    this.maxLineLength = maxLineLength;
+    this.in = new LineReader(in);
+    this.start = offset;
+    this.pos = offset;
+    this.end = endOffset;
+  }
+
+  public LineRecordReader(InputStream in, long offset, long endOffset,
+      Configuration job) throws IOException {
+    this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength",
+        Integer.MAX_VALUE);
+    this.in = new LineReader(in, job);
+    this.start = offset;
+    this.pos = offset;
+    this.end = endOffset;
+  }
+
+  public LongWritable createKey() {
+    return new LongWritable();
+  }
+
+  public Text createValue() {
+    return new Text();
+  }
+
+  /** Read a line. */
+  public synchronized boolean next(LongWritable key, Text value)
+      throws IOException {
+
+    while (pos < end) {
+      key.set(pos);
+
+      int newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(
+          Integer.MAX_VALUE, end - pos), maxLineLength));
+      if (newSize == 0) {
+        return false;
+      }
+      pos += newSize;
+      if (newSize < maxLineLength) {
+        return true;
+      }
+
+      // line too long. try again
+      LOG
+          .info("Skipped line of size " + newSize + " at pos "
+              + (pos - newSize));
+    }
+
+    return false;
+  }
+
+  /**
+   * Get the progress within the split
+   */
+  public float getProgress() {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (pos - start) / (float) (end - start));
+    }
+  }
+
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Nov  1 12:34:14 2011
@@ -209,7 +209,8 @@ public class LocalBSPRunner implements J
       bsp.setConf(conf);
       try {
         bsp.setup(groom);
-        bsp.bsp(groom);
+        // TODO 
+        bsp.bsp(groom, null, null);
       } catch (Exception e) {
         LOG.error("Exception during BSP execution!", e);
       }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,85 @@
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.NullWritable;
+
+public class NullInputFormat implements InputFormat<NullWritable, NullWritable> {
+
+  @Override
+  public RecordReader<NullWritable, NullWritable> getRecordReader(
+      InputSplit split, BSPJob job) throws IOException {
+    return new NullRecordReader();
+  }
+
+  @Override
+  public InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException {
+    InputSplit[] splits = new InputSplit[numBspTask];
+    for (int i = 0; i < numBspTask; i++) {
+      splits[i] = new NullInputSplit();
+    }
+
+    return splits;
+  }
+
+  public static class NullRecordReader implements
+      RecordReader<NullWritable, NullWritable> {
+    private boolean returnRecord = true;
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public NullWritable createValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return 0;
+    }
+
+    @Override
+    public boolean next(NullWritable key, NullWritable value)
+        throws IOException {
+      if (returnRecord == true) {
+        returnRecord = false;
+        return true;
+      }
+
+      return returnRecord;
+    }
+
+  }
+
+  public static class NullInputSplit implements InputSplit {
+    public long getLength() {
+      return 0;
+    }
+
+    public String[] getLocations() {
+      String[] locs = {};
+      return locs;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    public void write(DataOutput out) throws IOException {
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,20 @@
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.fs.FileSystem;
+
+public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
+
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job,
+      String name) {
+    return new RecordWriter<K, V>() {
+      public void write(K key, V value) {
+      }
+
+      public void close() {
+      }
+    };
+  }
+
+  public void checkOutputSpecs(FileSystem ignored, BSPJob job) {
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,15 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+public interface OutputCollector<K, V> {
+
+  /**
+   * Adds a key/value pair to the output.
+   * 
+   * @param key the key to collect.
+   * @param value to value to collect.
+   * @throws IOException
+   */
+  void collect(K key, V value) throws IOException;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,35 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+
+public interface OutputFormat<K, V> {
+
+  /**
+   * Get the {@link RecordWriter} for the given job.
+   * 
+   * @param ignored
+   * @param job configuration for the job whose output is being written.
+   * @param name the unique name for this part of the output.
+   * @return a {@link RecordWriter} to write the output for the job.
+   * @throws IOException
+   */
+  RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job, String name)
+      throws IOException;
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * 
+   * <p>
+   * This is to validate the output specification for the job when it is a job
+   * is submitted. Typically checks that it does not already exist, throwing an
+   * exception when it already exists, so that output is not overwritten.
+   * </p>
+   * 
+   * @param ignored
+   * @param job job configuration.
+   * @throws IOException when output should not be attempted
+   */
+  void checkOutputSpecs(FileSystem ignored, BSPJob job) throws IOException;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,54 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+public interface RecordReader<K, V> {
+  
+  /**
+   * Reads the next key/value pair from the input for processing.
+   * 
+   * @param key the key to read data into
+   * @param value the value to read data into
+   * @return true iff a key/value was read, false if at EOF
+   */
+  boolean next(K key, V value) throws IOException;
+
+  /**
+   * Create an object of the appropriate type to be used as a key.
+   * 
+   * @return a new key object.
+   */
+  K createKey();
+
+  /**
+   * Create an object of the appropriate type to be used as a value.
+   * 
+   * @return a new value object.
+   */
+  V createValue();
+
+  /**
+   * Returns the current position in the input.
+   * 
+   * @return the current position in the input.
+   * @throws IOException
+   */
+  long getPos() throws IOException;
+
+  /**
+   * Close this {@link InputSplit} to future operations.
+   * 
+   * @throws IOException
+   */
+  public void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link RecordReader} consumed i.e. has been
+   * processed by?
+   * 
+   * @return progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   */
+  float getProgress() throws IOException;
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,22 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+public interface RecordWriter<K, V> {
+  /**
+   * Writes a key/value pair.
+   * 
+   * @param key the key to write.
+   * @param value the value to write.
+   * @throws IOException
+   */
+  void write(K key, V value) throws IOException;
+
+  /**
+   * Close this <code>RecordWriter</code> to future operations.
+   * 
+   * @param reporter facility to report progress.
+   * @throws IOException
+   */
+  void close() throws IOException;
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Tue Nov  1 12:34:14 2011
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -93,6 +94,18 @@ public abstract class Task implements Wr
   public int getPartition() {
     return partition;
   }
+  
+  /** Construct output file names so that, when an output directory listing is
+   * sorted lexicographically, positions correspond to output partitions.*/
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+  
+  static synchronized String getOutputName(int partition) {
+    return "part-" + NUMBER_FORMAT.format(partition);
+  }
 
   @Override
   public String toString() {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue Nov  1 12:34:14 2011
@@ -24,7 +24,8 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
 
 /**
  *TaskInProgress maintains all the info needed for a Task in the lifetime of
@@ -75,6 +76,8 @@ class TaskInProgress {
 
   private BSPJobID jobId;
 
+  private RawSplit rawSplit;
+  
   /**
    * Constructor for new nexus between BSPMaster and GroomServer.
    * 
@@ -90,10 +93,11 @@ class TaskInProgress {
     init(jobId);
   }
 
-  public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+  public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit, BSPMaster master,
       Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
+    this.rawSplit = rawSplit;
     this.setBspMaster(master);
     this.job = job;
     this.setConf(conf);
@@ -102,7 +106,7 @@ class TaskInProgress {
     init(jobId);
   }
 
-  private void init(BSPJobID jobId2) {
+  private void init(BSPJobID jobId) {
     this.id = new TaskID(jobId, partition);
     this.startTime = System.currentTimeMillis();
   }
@@ -125,7 +129,10 @@ class TaskInProgress {
       return null;
     }
 
-    t = new BSPTask(jobId, jobFile, taskid, partition);
+    String splitClass = rawSplit.getClassName();
+    BytesWritable split = rawSplit.getBytes();
+    
+    t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
     activeTasks.put(taskid, status.getGroomName());
 
     return t;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Nov  1 12:34:14 2011
@@ -102,7 +102,7 @@ public class TaskLog {
   public static synchronized void cleanup(int logsRetainHours)
       throws IOException {
     // Purge logs of tasks on this tasktracker if their
-    // mtime has exceeded "mapred.task.log.retain" hours
+    // mtime has exceeded "bsp.task.log.retain" hours
     long purgeTimeStamp = System.currentTimeMillis()
         - (logsRetainHours * 60L * 60 * 1000);
     File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
@@ -199,7 +199,7 @@ public class TaskLog {
    * @return the number of bytes to cap the log files at
    */
   public static long getTaskLogLength(HamaConfiguration conf) {
-    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+    return conf.getLong("bsp.userlog.limit.kb", 100) * 1024;
   }
 
   /**

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,15 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
+
+  public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, BSPJob job)
+      throws IOException {
+    return new LineRecordReader(job.getConf(), (FileSplit) split);
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1195959&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Tue Nov  1 12:34:14 2011
@@ -0,0 +1,109 @@
+package org.apache.hama.bsp;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+
+  protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
+    private static final String utf8 = "UTF-8";
+    private static final byte[] newline;
+    static {
+      try {
+        newline = "\n".getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
+    }
+
+    protected DataOutputStream out;
+    private final byte[] keyValueSeparator;
+
+    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
+      this.out = out;
+      try {
+        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
+    }
+
+    public LineRecordWriter(DataOutputStream out) {
+      this(out, "\t");
+    }
+
+    /**
+     * Write the object to the byte stream, handling Text as a special case.
+     * 
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(utf8));
+      }
+    }
+
+    public synchronized void write(K key, V value) throws IOException {
+
+      boolean nullKey = key == null || key instanceof NullWritable;
+      boolean nullValue = value == null || value instanceof NullWritable;
+      if (nullKey && nullValue) {
+        return;
+      }
+      if (!nullKey) {
+        writeObject(key);
+      }
+      if (!(nullKey || nullValue)) {
+        out.write(keyValueSeparator);
+      }
+      if (!nullValue) {
+        writeObject(value);
+      }
+      out.write(newline);
+    }
+
+    public synchronized void close() throws IOException {
+      out.close();
+    }
+  }
+
+  public RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job,
+      String name) throws IOException {
+    boolean isCompressed = getCompressOutput(job);
+    String keyValueSeparator = job.getConf().get(
+        "bsp.textoutputformat.separator", "\t");
+    if (!isCompressed) {
+      Path file = FileOutputFormat.getTaskOutputPath(job, name);
+      FileSystem fs = file.getFileSystem(job.getConf());
+      FSDataOutputStream fileOut = fs.create(file);
+      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
+    } else {
+      Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
+          job, GzipCodec.class);
+      // create the named codec
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job
+          .getConf());
+      // build the filename including the extension
+      Path file = FileOutputFormat.getTaskOutputPath(job, name
+          + codec.getDefaultExtension());
+      FileSystem fs = file.getFileSystem(job.getConf());
+      FSDataOutputStream fileOut = fs.create(file);
+      return new LineRecordWriter<K, V>(new DataOutputStream(codec
+          .createOutputStream(fileOut)), keyValueSeparator);
+    }
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java Tue Nov  1 12:34:14 2011
@@ -1010,7 +1010,6 @@ public class Bytes {
 
   /**
    * Split passed range. Expensive operation relatively. Uses BigInteger math.
-   * Useful splitting ranges for MapReduce jobs.
    * 
    * @param a Beginning of range
    * @param b End of range

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1195959&r1=1195958&r2=1195959&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Tue Nov  1 12:34:14 2011
@@ -66,7 +66,8 @@ public class TestBSPMasterGroomServer ex
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     assertEquals(this.numOfGroom, cluster.getGroomServers());
     bsp.setNumBspTask(2);
-
+    bsp.setInputFormat(NullInputFormat.class);
+    
     FileSystem fileSys = FileSystem.get(conf);
 
     if (bsp.waitForCompletion(true)) {



Mime
View raw message