incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1199686 - in /incubator/hama/trunk/core/src: main/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/
Date Wed, 09 Nov 2011 10:28:07 GMT
Author: tjungblut
Date: Wed Nov  9 10:28:06 2011
New Revision: 1199686

URL: http://svn.apache.org/viewvc?rev=1199686&view=rev
Log:
Added partitioning

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java   (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java   (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java   (with props)
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1199686&r1=1199685&r2=1199686&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Nov  9 10:28:06 2011
@@ -290,21 +290,38 @@ public class BSPJob extends BSPJobContex
   public void setOutputPath(Path path) {
     conf.set("bsp.output.dir", path.toString());
   }
-  
+
   /**
    * Sets the input path for the job.
    * 
-   * @param path where the output gets written.
    */
   public void setInputPath(Path path) {
     conf.set("bsp.input.dir", path.toString());
   }
 
+  /**
+   * Sets the output format for the job.
+   */
   @SuppressWarnings("rawtypes")
   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
     conf.setClass("bsp.output.format.class", theClass, OutputFormat.class);
   }
 
+  /**
+   * Sets the partitioner for the input of the job.
+   */
+  @SuppressWarnings("rawtypes")
+  public void setPartitioner(Class<? extends Partitioner> theClass) {
+    conf.setClass("bsp.input.partitioner.class", theClass, Partitioner.class);
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Partitioner getPartitioner() {
+    return (Partitioner) ReflectionUtils.newInstance(conf
+        .getClass("bsp.input.partitioner.class", HashPartitioner.class,
+            Partitioner.class), conf);
+  }
+
   @SuppressWarnings("rawtypes")
   public OutputFormat getOutputFormat() {
     return (OutputFormat) ReflectionUtils.newInstance(conf.getClass(

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1199686&r1=1199685&r2=1199686&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Nov  9 10:28:06 2011
@@ -22,8 +22,9 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -39,12 +40,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.HamaConfiguration;
@@ -290,6 +296,8 @@ public class BSPJobClient extends Config
 
   public RunningJob submitJobInternal(BSPJob job) throws IOException {
     BSPJobID jobId = jobSubmitClient.getNewJobId();
+    
+    job.setJobID(jobId);
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
         + Integer.toString(Math.abs(r.nextInt()), 36));
@@ -327,6 +335,7 @@ public class BSPJobClient extends Config
     if (job.get("bsp.input.dir") != null) {
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
+      job = partition(job);
       job.setNumBspTask(writeSplits(job, submitSplitFile));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
@@ -379,31 +388,135 @@ public class BSPJobClient extends Config
     }
   }
 
-  private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException {
-    InputSplit[] splits = job.getInputFormat().getSplits(job,
-        job.getNumBspTask());
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new Comparator<InputSplit>() {
-      public int compare(InputSplit a, InputSplit b) {
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private BSPJob partition(BSPJob job) throws IOException {
+    if (job.getConf().get("bsp.input.partitioner.class") != null) {
+      InputSplit[] splits = job.getInputFormat().getSplits(job,
+          job.getNumBspTask());
+      int numOfTasks = job.getNumBspTask();
+      String input = job.getConf().get("bsp.input.dir");
+
+      if (input != null) {
+        InputFormat<?, ?> inputFormat = job.getInputFormat();
+
+        Path partitionedPath = new Path(input, "hama-partitions");
+        Path inputPath = new Path(input);
+        if (fs.isFile(inputPath)) {
+          partitionedPath = new Path(inputPath.getParent(), "hama-partitions");
+        }
+
+        String alternatePart = job.get("bsp.partitioning.dir");
+        if (alternatePart != null) {
+          partitionedPath = new Path(alternatePart, job.getJobID().toString());
+        }
+
+        if (fs.exists(partitionedPath)) {
+          fs.delete(partitionedPath, true);
+        } else {
+          fs.mkdirs(partitionedPath);
+        }
+        // FIXME this is soo unsafe
+        RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job);
+
+        List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+            numOfTasks);
+
+        CompressionType compressionType = getOutputCompressionType(job);
+        Class<? extends CompressionCodec> outputCompressorClass = getOutputCompressorClass(
+            job, null);
+        CompressionCodec codec = null;
+        if (outputCompressorClass != null) {
+          codec = ReflectionUtils.newInstance(outputCompressorClass,
+              job.getConf());
+        }
+
         try {
-          long left = a.getLength();
-          long right = b.getLength();
-          if (left == right) {
-            return 0;
-          } else if (left < right) {
-            return 1;
-          } else {
-            return -1;
+          for (int i = 0; i < numOfTasks; i++) {
+            Path p = new Path(partitionedPath, "part-" + i);
+            if (codec == null) {
+              writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+                  sampleReader.createKey().getClass(), sampleReader
+                      .createValue().getClass(), CompressionType.NONE));
+            } else {
+              writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+                  sampleReader.createKey().getClass(), sampleReader
+                      .createValue().getClass(), compressionType, codec));
+            }
+          }
+
+          Partitioner partitioner = job.getPartitioner();
+          for (int i = 0; i < splits.length; i++) {
+            InputSplit split = splits[i];
+            RecordReader recordReader = inputFormat.getRecordReader(split, job);
+            Object key = recordReader.createKey();
+            Object value = recordReader.createValue();
+            while (recordReader.next(key, value)) {
+              int index = partitioner.getPartition(key, value, numOfTasks);
+              writers.get(index).append(key, value);
+            }
+            LOG.debug("Done with split " + i);
+          }
+        } finally {
+          for (SequenceFile.Writer wr : writers) {
+            wr.close();
           }
-        } catch (IOException ie) {
-          throw new RuntimeException("Problem getting input split size", ie);
         }
+
+        job.setInputFormat(SequenceFileInputFormat.class);
+        job.setInputPath(partitionedPath);
       }
-    });
-    DataOutputStream out = writeSplitsFileHeader(job.getConf(),
-        submitSplitFile, splits.length);
 
+    }
+    return job;
+  }
+
+  /**
+   * Get the {@link CompressionType} for the output {@link SequenceFile}.
+   * 
+   * @param job the {@link Job}
+   * @return the {@link CompressionType} for the output {@link SequenceFile},
+   *         defaulting to {@link CompressionType#RECORD}
+   */
+  static CompressionType getOutputCompressionType(BSPJob job) {
+    String val = job.get("bsp.partitioning.compression.type");
+    if (val != null) {
+      return CompressionType.valueOf(val);
+    } else {
+      return CompressionType.NONE;
+    }
+  }
+
+  /**
+   * Get the {@link CompressionCodec} for compressing the job outputs.
+   * 
+   * @param job the {@link Job} to look in
+   * @param defaultValue the {@link CompressionCodec} to return if not set
+   * @return the {@link CompressionCodec} to be used to compress the job outputs
+   * @throws IllegalArgumentException if the class was specified, but not found
+   */
+  static Class<? extends CompressionCodec> getOutputCompressorClass(BSPJob job,
+      Class<? extends CompressionCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    Configuration conf = job.getConf();
+    String name = conf.get("bsp.partitioning.compression.codec");
+    if (name != null) {
+      try {
+        codecClass = conf.getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+
+  private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException {
+    InputSplit[] splits = job.getInputFormat().getSplits(job,
+        job.getNumBspTask());
+
+    final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
+        submitSplitFile, splits.length);
     try {
       DataOutputBuffer buffer = new DataOutputBuffer();
       RawSplit rawSplit = new RawSplit();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1199686&r1=1199685&r2=1199686&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Wed Nov  9 10:28:06 2011
@@ -31,21 +31,21 @@ import org.apache.hama.HamaConfiguration
  */
 public class BSPJobContext {
   // Put all of the attribute names in here so that BSPJob and JobContext are
-  // consistent.  
+  // consistent.
   protected static final String WORK_CLASS_ATTR = "bsp.work.class";
   protected static final String COMBINER_CLASS_ATTR = "bsp.combiner.class";
   protected static final String INPUT_FORMAT_CLASS_ATTR = "bsp.inputformat.class";
   protected static final String OUTPUT_FORMAT_CLASS_ATTR = "bsp.outputformat.class";
   protected static final String WORKING_DIR = "bsp.working.dir";
-  
+
   protected final Configuration conf;
-  private final BSPJobID jobId;
+  private BSPJobID jobId;
 
   public BSPJobContext(Configuration conf, BSPJobID jobId) {
     this.conf = conf;
     this.jobId = jobId;
   }
-  
+
   public BSPJobContext(Path config, BSPJobID jobId) throws IOException {
     this.conf = new HamaConfiguration();
     this.jobId = jobId;
@@ -56,6 +56,10 @@ public class BSPJobContext {
     return jobId;
   }
 
+  void setJobID(BSPJobID id) {
+    this.jobId = id;
+  }
+
   public Path getWorkingDirectory() {
     String name = conf.get(WORKING_DIR);
 
@@ -79,32 +83,32 @@ public class BSPJobContext {
   public String getJar() {
     return conf.get("bsp.jar");
   }
-  
-  /** 
-   * Constructs a local file name. Files are distributed among configured
-   * local directories.
+
+  /**
+   * Constructs a local file name. Files are distributed among configured local
+   * directories.
    */
   public Path getLocalPath(String pathString) throws IOException {
     return conf.getLocalPath("bsp.local.dir", pathString);
   }
-  
+
   public String getUser() {
     return conf.get("user.name");
   }
-  
+
   public void writeXml(OutputStream out) throws IOException {
     conf.writeXml(out);
   }
-  
+
   public Configuration getConf() {
     return this.conf;
   }
-  
+
   public String get(String name) {
     return conf.get(name);
   }
-  
+
   public int getInt(String name, int defaultValue) {
-    return conf.getInt(name, defaultValue);    
+    return conf.getInt(name, defaultValue);
   }
 }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+
+/**
+ * An abstract {@link org.apache.hadoop.mapred.InputFormat}. Splits are
+ * constructed from the files under the input paths. A split cannot have files
+ * from different pools. Each split returned may contain blocks from different
+ * files. If a maxSplitSize is specified, then blocks on the same node are
+ * combined to form a single split. Blocks that are left over are then combined
+ * with other blocks in the same rack. If maxSplitSize is not specified, then
+ * blocks from the same rack are combined in a single split; no attempt is made
+ * to create node-local splits. If the maxSplitSize is equal to the block size,
+ * then this class is similar to the default spliting behaviour in Hadoop: each
+ * block is a locally processed split. Subclasses implement
+ * {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
+ * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
+ * 
+ * @see CombineFileSplit
+ */
+public abstract class CombineFileInputFormat<K, V> extends
+    FileInputFormat<K, V> {
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // A pool of input paths filters. A split cannot have blocks from files
+  // across multiple pools.
+  private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
+
+  // mapping from a rack name to the set of Nodes in the rack
+  private static HashMap<String, Set<String>> rackToNodes = new HashMap<String, Set<String>>();
+
+  /**
+   * This has to be overridden from concrete formats, we provide a SequenceFile
+   * version of it for partitioning.
+   * 
+   * @param split
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  public abstract RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException;
+
+  /**
+   * Specify the maximum size (in bytes) of each split. Each split is
+   * approximately equal to the specified size.
+   */
+  protected void setMaxSplitSize(long maxSplitSize) {
+    this.maxSplitSize = maxSplitSize;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per node. This applies to
+   * data that is left over after combining data on a single node into splits
+   * that are of maximum size specified by maxSplitSize. This leftover data will
+   * be combined into its own split if its size exceeds minSplitSizeNode.
+   */
+  protected void setMinSplitSizeNode(long minSplitSizeNode) {
+    this.minSplitSizeNode = minSplitSizeNode;
+  }
+
+  /**
+   * Specify the minimum size (in bytes) of each split per rack. This applies to
+   * data that is left over after combining data on a single rack into splits
+   * that are of maximum size specified by maxSplitSize. This leftover data will
+   * be combined into its own split if its size exceeds minSplitSizeRack.
+   */
+  protected void setMinSplitSizeRack(long minSplitSizeRack) {
+    this.minSplitSizeRack = minSplitSizeRack;
+  }
+
+  /**
+   * Create a new pool and add the filters to it. A split cannot have files from
+   * different pools.
+   */
+  protected void createPool(Configuration conf, List<PathFilter> filters) {
+    pools.add(new MultiPathFilter(filters));
+  }
+
+  /**
+   * Create a new pool and add the filters to it. A pathname can satisfy any one
+   * of the specified filters. A split cannot have files from different pools.
+   */
+  protected void createPool(Configuration conf, PathFilter... filters) {
+    MultiPathFilter multi = new MultiPathFilter();
+    for (PathFilter f : filters) {
+      multi.add(f);
+    }
+    pools.add(multi);
+  }
+
+  /**
+   * default constructor
+   */
+  public CombineFileInputFormat() {
+  }
+
+  @Override
+  public InputSplit[] getSplits(BSPJob bspJob, int numSplits)
+      throws IOException {
+
+    Configuration job = bspJob.getConf();
+
+    long minSizeNode = 0;
+    long minSizeRack = 0;
+    long maxSize = 0;
+
+    // the values specified by setxxxSplitSize() takes precedence over the
+    // values that might have been specified in the config
+    if (minSplitSizeNode != 0) {
+      minSizeNode = minSplitSizeNode;
+    } else {
+      minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
+    }
+    if (minSplitSizeRack != 0) {
+      minSizeRack = minSplitSizeRack;
+    } else {
+      minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
+    }
+    if (maxSplitSize != 0) {
+      maxSize = maxSplitSize;
+    } else {
+      maxSize = job.getLong("mapred.max.split.size", 0);
+    }
+    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+      throw new IOException("Minimum split size pernode " + minSizeNode
+          + " cannot be larger than maximum split size " + maxSize);
+    }
+    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+      throw new IOException("Minimum split size per rack" + minSizeRack
+          + " cannot be larger than maximum split size " + maxSize);
+    }
+    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+      throw new IOException("Minimum split size per node" + minSizeNode
+          + " cannot be smaller than minimum split size per rack "
+          + minSizeRack);
+    }
+
+    // all the files in input set
+    Path[] paths = FileUtil.stat2Paths(listStatus(bspJob));
+    List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
+    if (paths.length == 0) {
+      return splits.toArray(new CombineFileSplit[splits.size()]);
+    }
+
+    // In one single iteration, process all the paths in a single pool.
+    // Processing one pool at a time ensures that a split contans paths
+    // from a single pool only.
+    for (MultiPathFilter onepool : pools) {
+      ArrayList<Path> myPaths = new ArrayList<Path>();
+
+      // pick one input path. If it matches all the filters in a pool,
+      // add it to the output set
+      for (int i = 0; i < paths.length; i++) {
+        if (paths[i] == null) { // already processed
+          continue;
+        }
+        Path p = new Path(paths[i].toUri().getPath());
+        if (onepool.accept(p)) {
+          myPaths.add(paths[i]); // add it to my output set
+          paths[i] = null; // already processed
+        }
+      }
+      // create splits for all files in this pool.
+      getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
+          minSizeNode, minSizeRack, splits);
+    }
+
+    // Finally, process all paths that do not belong to any pool.
+    ArrayList<Path> myPaths = new ArrayList<Path>();
+    for (int i = 0; i < paths.length; i++) {
+      if (paths[i] == null) { // already processed
+        continue;
+      }
+      myPaths.add(paths[i]);
+    }
+    // create splits for all files that are not in any pool.
+    getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize,
+        minSizeNode, minSizeRack, splits);
+
+    // free up rackToNodes map
+    rackToNodes.clear();
+    return splits.toArray(new CombineFileSplit[splits.size()]);
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(BSPJob job, Path[] paths, long maxSize,
+      long minSizeNode, long minSizeRack, List<CombineFileSplit> splits)
+      throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes = new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = new HashMap<String, List<OneBlockInfo>>();
+
+    files = new OneFileInfo[paths.length];
+    if (paths.length == 0) {
+      return;
+    }
+
+    // populate all the blocks for all files
+    long totLength = 0;
+    for (int i = 0; i < paths.length; i++) {
+      files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes,
+          nodeToBlocks);
+      totLength += files[i].getLength();
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> nodes = new ArrayList<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node.
+    for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+        .entrySet().iterator(); iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from
+      // blockToNodes so that the same block does not appear in
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(job, splits, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely
+      // that they will be combined with other blocks from the same rack later
+      // on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(job, splits, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these
+    // overflow
+    // blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    ArrayList<String> racks = new ArrayList<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack.
+      // Come back to this rack after creating a single split for each of the
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks
+          .entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a mimimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that remained to be
+            // processed.
+            // Keep them in 'overflow' block list. These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
+    }
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks Add
+   * this new split into splitList.
+   */
+  private void addCreatedSplit(BSPJob job, List<CombineFileSplit> splitList,
+      List<String> locations, ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] fl = new Path[validBlocks.size()];
+    long[] offset = new long[validBlocks.size()];
+    long[] length = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      fl[i] = validBlocks.get(i).onepath;
+      offset[i] = validBlocks.get(i).offset;
+      length[i] = validBlocks.get(i).length;
+    }
+
+    // add this split to the list that is returned
+    CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, length,
+        locations.toArray(new String[0]));
+    splitList.add(thissplit);
+  }
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize; // size of the file
+    private OneBlockInfo[] blocks; // all blocks in this file
+
+    OneFileInfo(Path path, BSPJob job,
+        HashMap<String, List<OneBlockInfo>> rackToBlocks,
+        HashMap<OneBlockInfo, String[]> blockToNodes,
+        HashMap<String, List<OneBlockInfo>> nodeToBlocks) throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(job.getConf());
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
+          stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        blocks = new OneBlockInfo[locations.length];
+        for (int i = 0; i < locations.length; i++) {
+
+          fileSize += locations[i].getLength();
+          OneBlockInfo oneblock = new OneBlockInfo(path,
+              locations[i].getOffset(), locations[i].getLength(),
+              locations[i].getHosts(), locations[i].getTopologyPaths());
+          blocks[i] = oneblock;
+
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < oneblock.racks.length; j++) {
+            String rack = oneblock.racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            // Add this host to rackToNodes map
+            addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+    long getLength() {
+      return fileSize;
+    }
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath; // name of this file
+    long offset; // offset in file
+    long length; // length of this block
+    String[] hosts; // nodes on whch this block resides
+    String[] racks; // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len, String[] hosts,
+        String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length || topologyPaths.length == 0);
+
+      // if the file ystem does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i],
+              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+  private static void addHostToRack(String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+
+  private static List<String> getHosts(List<String> racks) {
+    List<String> hosts = new ArrayList<String>();
+    for (String rack : racks) {
+      hosts.addAll(rackToNodes.get(rack));
+    }
+    return hosts;
+  }
+
+  /**
+   * Accept a path only if any one of filters given in the constructor do.
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter() {
+      this.filters = new ArrayList<PathFilter>();
+    }
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public void add(PathFilter one) {
+      filters.add(one);
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (filter.accept(path)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public String toString() {
+      StringBuffer buf = new StringBuffer();
+      buf.append("[");
+      for (PathFilter f : filters) {
+        buf.append(f);
+        buf.append(",");
+      }
+      buf.append("]");
+      return buf.toString();
+    }
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
+
+/**
+ * A sub-collection of input files. Unlike
+ * {@link org.apache.hadoop.mapred.FileSplit}, CombineFileSplit class does not
+ * represent a split of a file, but a split of input files into smaller sets. A
+ * split may contain blocks from different file but all the blocks in the same
+ * split are probably local to some rack <br>
+ * CombineFileSplit can be used to implement
+ * {@link org.apache.hadoop.mapred.RecordReader}'s, with reading one record per
+ * file.
+ * 
+ * @see org.apache.hadoop.mapred.FileSplit
+ * @see CombineFileInputFormat
+ */
+public class CombineFileSplit implements InputSplit {
+
+  private Path[] paths;
+  private long[] startoffset;
+  private long[] lengths;
+  private String[] locations;
+  private long totLength;
+  private BSPJob job;
+
+  /**
+   * default constructor
+   */
+  public CombineFileSplit() {
+  }
+
+  public CombineFileSplit(BSPJob job, Path[] files, long[] start,
+      long[] lengths, String[] locations) {
+    initSplit(job, files, start, lengths, locations);
+  }
+
+  public CombineFileSplit(BSPJob job, Path[] files, long[] lengths) {
+    long[] startoffset = new long[files.length];
+    for (int i = 0; i < startoffset.length; i++) {
+      startoffset[i] = 0;
+    }
+    String[] locations = new String[files.length];
+    for (int i = 0; i < locations.length; i++) {
+      locations[i] = "";
+    }
+    initSplit(job, files, startoffset, lengths, locations);
+  }
+
+  private void initSplit(BSPJob job, Path[] files, long[] start,
+      long[] lengths, String[] locations) {
+    this.job = job;
+    this.startoffset = start;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    this.locations = locations;
+    for (long length : lengths) {
+      totLength += length;
+    }
+  }
+
+
+  public Configuration getJob() {
+    return job.getConf();
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+
+  /** Returns an array containing the startoffsets of the files in the split */
+  public long[] getStartOffsets() {
+    return startoffset;
+  }
+
+  /** Returns an array containing the lengths of the files in the split */
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  /** Returns the start offset of the i<sup>th</sup> Path */
+  public long getOffset(int i) {
+    return startoffset[i];
+  }
+
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  /** Returns all the Paths where this input-split resides */
+  public String[] getLocations() throws IOException {
+    return locations;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    totLength = in.readLong();
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for (int i = 0; i < arrLength; i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for (int i = 0; i < filesLength; i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+    arrLength = in.readInt();
+    startoffset = new long[arrLength];
+    for (int i = 0; i < arrLength; i++) {
+      startoffset[i] = in.readLong();
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(totLength);
+    out.writeInt(lengths.length);
+    for (long length : lengths) {
+      out.writeLong(length);
+    }
+    out.writeInt(paths.length);
+    for (Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+    out.writeInt(startoffset.length);
+    for (long length : startoffset) {
+      out.writeLong(length);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < paths.length; i++) {
+      if (i == 0) {
+        sb.append("Paths:");
+      }
+      sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] + "+"
+          + lengths[i]);
+      if (i < paths.length - 1) {
+        sb.append(",");
+      }
+    }
+    if (locations != null) {
+      String locs = "";
+      StringBuffer locsb = new StringBuffer();
+      for (int i = 0; i < locations.length; i++) {
+        locsb.append(locations[i] + ":");
+      }
+      locs = locsb.toString();
+      sb.append(" Locations:" + locs + "; ");
+    }
+    return sb.toString();
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * HashPartitioner is partitioning by the hashcode of the key.
+ * 
+ */
+public class HashPartitioner<K, V> implements Partitioner<K, V> {
+
+  @Override
+  public int getPartition(K key, V value, int numTasks) {
+    return key.hashCode() % numTasks;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * Partitioning interface which is used to spread key value pairs to a specific
+ * partition.
+ * 
+ * @param <K>
+ * @param <V>
+ */
+public interface Partitioner<K, V> {
+
+  /**
+   * Partitions a specific key value mapping to a bucket.
+   * 
+   * @param key
+   * @param value
+   * @param numTasks
+   * @return a number between 0 and numTasks (exclusive) that tells which
+   *         partition it belongs to.
+   */
+  public int getPartition(K key, V value, int numTasks);
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileInputFormat<K,V> extends FileInputFormat<K, V> {
+
+  public SequenceFileInputFormat() {
+    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+  }
+
+  @Override
+  public RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
+      throws IOException {
+    return new SequenceFileRecordReader<K,V>(job.getConf(),(FileSplit) split);
+  }
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
+
+  private SequenceFile.Reader in;
+  private long start;
+  private long end;
+  private boolean more = true;
+  protected Configuration conf;
+
+  public SequenceFileRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+    Path path = split.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new SequenceFile.Reader(fs, path, conf);
+    this.end = split.getStart() + split.getLength();
+    this.conf = conf;
+
+    if (split.getStart() > in.getPosition()) {
+      in.sync(split.getStart()); // sync to start
+    }
+
+    this.start = in.getPosition();
+    more = start < end;
+  }
+
+  /**
+   * The class of key that must be passed to {@link #next(Object, Object)}..
+   */
+  @SuppressWarnings("rawtypes")
+  public Class getKeyClass() {
+    return in.getKeyClass();
+  }
+
+  /**
+   * The class of value that must be passed to {@link #next(Object, Object)}..
+   */
+  @SuppressWarnings("rawtypes")
+  public Class getValueClass() {
+    return in.getValueClass();
+  }
+
+  @SuppressWarnings("unchecked")
+  public K createKey() {
+    return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
+  }
+
+  @SuppressWarnings("unchecked")
+  public V createValue() {
+    return (V) ReflectionUtils.newInstance(getValueClass(), conf);
+  }
+
+  public synchronized boolean next(K key, V value) throws IOException {
+    if (!more)
+      return false;
+    long pos = in.getPosition();
+    boolean remaining = (in.next(key) != null);
+    if (remaining) {
+      getCurrentValue(value);
+    }
+    if (pos >= end && in.syncSeen()) {
+      more = false;
+    } else {
+      more = remaining;
+    }
+    return more;
+  }
+
+  protected synchronized boolean next(K key) throws IOException {
+    if (!more)
+      return false;
+    long pos = in.getPosition();
+    boolean remaining = (in.next(key) != null);
+    if (pos >= end && in.syncSeen()) {
+      more = false;
+    } else {
+      more = remaining;
+    }
+    return more;
+  }
+
+  protected synchronized void getCurrentValue(V value) throws IOException {
+    in.getCurrentValue(value);
+  }
+
+  /**
+   * Return the progress within the input split
+   * 
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
+    }
+  }
+
+  public synchronized long getPos() throws IOException {
+    return in.getPosition();
+  }
+
+  protected synchronized void seek(long pos) throws IOException {
+    in.seek(pos);
+  }
+
+  public synchronized void close() throws IOException {
+    in.close();
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1199686&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Wed Nov  9 10:28:06 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.util.KeyValuePair;
+import org.apache.zookeeper.KeeperException;
+
+public class TestPartitioning extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
+
+  public void testPartitioner() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
+    conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest");
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setJobName("Test partitioning with input");
+    bsp.setBspClass(PartionedBSP.class);
+    bsp.setNumBspTask(2);
+    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    bsp.setInputFormat(TextInputFormat.class);
+    bsp.setOutputFormat(NullOutputFormat.class);
+    bsp.setInputPath(new Path("../CHANGES.txt"));
+    bsp.setPartitioner(HashPartitioner.class);
+    assertTrue(bsp.waitForCompletion(true));
+  }
+
+  public static class PartionedBSP extends
+      BSP<LongWritable, Text, NullWritable, NullWritable> {
+
+    @Override
+    public void bsp(BSPPeer<LongWritable, Text, NullWritable, NullWritable> peer)
+        throws IOException, KeeperException, InterruptedException {
+      long numOfPairs = 0;
+      KeyValuePair<LongWritable, Text> readNext = null;
+      while ((readNext = peer.readNext()) != null) {
+        LOG.debug(readNext.getKey().get() + " / " + readNext.getValue().toString());
+        numOfPairs++;
+      }
+
+      assertTrue(numOfPairs > 2);
+    }
+
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message