Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D92FAD7B0 for ; Mon, 19 Nov 2012 17:36:28 +0000 (UTC) Received: (qmail 55297 invoked by uid 500); 19 Nov 2012 17:36:28 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 55230 invoked by uid 500); 19 Nov 2012 17:36:28 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 55214 invoked by uid 99); 19 Nov 2012 17:36:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Nov 2012 17:36:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Nov 2012 17:36:22 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F339E23888E7 for ; Mon, 19 Nov 2012 17:35:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1411326 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/ ml/src/test/java/org/apache/hama/ml/kmeans/ yarn/src/main/java/org/apache/hama/bsp/ Date: Mon, 19 Nov 2012 17:35:57 -0000 To: commits@hama.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121119173559.F339E23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tjungblut Date: Mon Nov 19 17:35:55 2012 New Revision: 1411326 URL: http://svn.apache.org/viewvc?rev=1411326&view=rev Log: [HAMA-678]: fix input splitting Modified: hama/trunk/CHANGES.txt hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Modified: hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/CHANGES.txt (original) +++ hama/trunk/CHANGES.txt Mon Nov 19 17:35:55 2012 @@ -10,6 +10,7 @@ Release 0.6 (unreleased changes) BUG FIXES + HAMA-678: Fix input splitting (tjungblut) HAMA-613: Scheduler kills job too silently when out of slots (Yuesheng Hu via edwardyoon) HAMA-647: Make the input spliter robustly (Yuesheng Hu via edwardyoon) HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut) Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Mon Nov 19 17:35:55 2012 @@ -31,12 +31,14 @@ public abstract class BSP peer) throws IOException, SyncException, InterruptedException; /** * {@inheritDoc} */ + @Override public void setup(BSPPeer peer) throws IOException, SyncException, InterruptedException { @@ -45,6 +47,7 @@ public abstract class BSP peer) throws IOException { } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Nov 19 17:35:55 2012 @@ -301,20 +301,8 @@ public class BSPJobClient extends Config throws IOException { BSPJob job = pJob; job.setJobID(jobId); - int maxTasks = 0; - int limitTasks = job.getConf().getInt(Constants.MAX_TASKS_PER_JOB, 0); - - ClusterStatus clusterStatus = getClusterStatus(true); - - if(limitTasks > 0) { - maxTasks = limitTasks; - } else { - maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks(); - } - - if (maxTasks < job.getNumBspTask()) { - throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed."); - } + int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0); + int maxTasks = checkTaskLimits(job, limitTasks); Path submitJobDir = new Path(getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); @@ -337,8 +325,8 @@ public class BSPJobClient extends Config if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); - if (job.getConf().get("bsp.input.partitioner.class") != null - && !job.getConf() + if (job.getConfiguration().get("bsp.input.partitioner.class") != null + && !job.getConfiguration() .getBoolean("hama.graph.runtime.partitioning", false)) { job = partition(job, maxTasks); maxTasks = job.getInt("hama.partition.count", maxTasks); @@ -384,6 +372,22 @@ public class BSPJobClient extends Config return launchJob(jobId, job, submitJobFile, fs); } + protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { + int maxTasks; + ClusterStatus clusterStatus = getClusterStatus(true); + + if(limitTasks > 0) { + maxTasks = limitTasks; + } else { + maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks(); + } + + if (maxTasks < job.getNumBspTask()) { + throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed."); + } + return maxTasks; + } + protected RunningJob launchJob(BSPJobID jobId, BSPJob job, Path submitJobFile, FileSystem fs) throws IOException { // @@ -405,7 +409,7 @@ public class BSPJobClient extends Config (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() : maxTasks); - String input = job.getConf().get("bsp.input.dir"); + String input = job.getConfiguration().get("bsp.input.dir"); if (input != null) { InputFormat inputFormat = job.getInputFormat(); @@ -438,18 +442,18 @@ public class BSPJobClient extends Config CompressionCodec codec = null; if (outputCompressorClass != null) { codec = ReflectionUtils.newInstance(outputCompressorClass, - job.getConf()); + job.getConfiguration()); } try { for (int i = 0; i < splits.length; i++) { Path p = new Path(partitionedPath, getPartitionName(i)); if (codec == null) { - writers.add(SequenceFile.createWriter(fs, job.getConf(), p, + writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, sampleReader.createKey().getClass(), sampleReader.createValue() .getClass(), CompressionType.NONE)); } else { - writers.add(SequenceFile.createWriter(fs, job.getConf(), p, + writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p, sampleReader.createKey().getClass(), sampleReader.createValue() .getClass(), compressionType, codec)); } @@ -516,7 +520,7 @@ public class BSPJobClient extends Config static Class getOutputCompressorClass(BSPJob job, Class defaultValue) { Class codecClass = defaultValue; - Configuration conf = job.getConf(); + Configuration conf = job.getConfiguration(); String name = conf.get("bsp.partitioning.compression.codec"); if (name != null) { try { @@ -537,7 +541,7 @@ public class BSPJobClient extends Config (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask() : maxTasks); - final DataOutputStream out = writeSplitsFileHeader(job.getConf(), + final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(), submitSplitFile, splits.length); try { DataOutputBuffer buffer = new DataOutputBuffer(); @@ -706,7 +710,7 @@ public class BSPJobClient extends Config public static void runJob(BSPJob job) throws FileNotFoundException, IOException { - BSPJobClient jc = new BSPJobClient(job.getConf()); + BSPJobClient jc = new BSPJobClient(job.getConfiguration()); if (job.getNumBspTask() == 0 || job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) { Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Mon Nov 19 17:35:55 2012 @@ -102,10 +102,18 @@ public class BSPJobContext { conf.writeXml(out); } + /** + * @deprecated use {@link #getConfiguration()} instead. + */ + @Deprecated public Configuration getConf() { return this.conf; } + public Configuration getConfiguration() { + return this.conf; + } + public String get(String name) { return conf.get(name); } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Mon Nov 19 17:35:55 2012 @@ -114,7 +114,7 @@ public final class BSPTask extends Task private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) { - long pingPeriod = job.getConf().getLong(Constants.GROOM_PING_PERIOD, + long pingPeriod = job.getConfiguration().getLong(Constants.GROOM_PING_PERIOD, Constants.DEFAULT_GROOM_PING_PERIOD) / 2; try { @@ -156,8 +156,8 @@ public final class BSPTask extends Task throws Exception { BSP bsp = (BSP) ReflectionUtils - .newInstance(job.getConf().getClass("bsp.work.class", BSP.class), - job.getConf()); + .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class), + job.getConfiguration()); // The policy is to throw the first exception and log the remaining. Exception firstException = null; Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Mon Nov 19 17:35:55 2012 @@ -140,7 +140,7 @@ public abstract class CombineFileInputFo public InputSplit[] getSplits(BSPJob bspJob, int numSplits) throws IOException { - Configuration job = bspJob.getConf(); + Configuration job = bspJob.getConfiguration(); long minSizeNode = 0; long minSizeRack = 0; @@ -446,7 +446,7 @@ public abstract class CombineFileInputFo this.fileSize = 0; // get block locations from file system - FileSystem fs = path.getFileSystem(job.getConf()); + FileSystem fs = path.getFileSystem(job.getConfiguration()); FileStatus stat = fs.getFileStatus(path); BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Mon Nov 19 17:35:55 2012 @@ -86,7 +86,7 @@ public class CombineFileSplit implements } public Configuration getJob() { - return job.getConf(); + return job.getConfiguration(); } @Override Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Mon Nov 19 17:35:55 2012 @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.InvalidInputException; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; @@ -47,7 +48,6 @@ public abstract class FileInputFormat getRecordReader(InputSplit split, BSPJob job) throws IOException; @@ -103,7 +90,7 @@ public abstract class FileInputFormat filter) { - conf.getConf().setClass("bsp.input.pathFilter.class", filter, + conf.getConfiguration().setClass("bsp.input.pathFilter.class", filter, PathFilter.class); } @@ -113,10 +100,10 @@ public abstract class FileInputFormat filterClass = conf.getConf().getClass( + Class filterClass = conf.getConfiguration().getClass( "bsp.input.pathFilter.class", null, PathFilter.class); return (filterClass != null) ? ReflectionUtils.newInstance(filterClass, - conf.getConf()) : null; + conf.getConfiguration()) : null; } /** @@ -147,7 +134,7 @@ public abstract class FileInputFormat + * numSplits will be ignored by the framework. */ @Override public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { - FileStatus[] files = listStatus(job); - - long totalSize = computeTotalSize(job, files); - long goalSize = computeGoalSize(numSplits, totalSize); - - ArrayList splits = new ArrayList(numSplits); + long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); + long maxSize = getMaxSplitSize(job); - // take the short circuit path if we have already partitioned - if (numSplits == files.length) { - for (FileStatus file : files) { - if (file != null) { - splits.add(new FileSplit(file.getPath(), 0, file.getLen(), - new String[0])); + // generate splits + List splits = new ArrayList(); + FileStatus[] files = listStatus(job); + for (FileStatus file : files) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + long length = file.getLen(); + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + if ((length != 0) && isSplitable(job, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(blockSize, minSize, maxSize); + + long bytesRemaining = length; + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new FileSplit(path, length - bytesRemaining, splitSize, + blkLocations[blkIndex].getHosts())); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + splits + .add(new FileSplit(path, length - bytesRemaining, bytesRemaining, + blkLocations[blkLocations.length - 1].getHosts())); } + } else if (length != 0) { + splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); + } else { + // Create empty hosts array for zero length files + splits.add(new FileSplit(path, 0, length, new String[0])); } - return splits.toArray(new FileSplit[splits.size()]); } - LOG.info("numSplits: " + numSplits); - long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1), - minSplitSize); + // Save the number of input files in the job-conf + job.getConfiguration().setLong("bsp.input.files", files.length); - // generate splits - NetworkTopology clusterMap = new NetworkTopology(); - for (FileStatus file : files) { - if (file != null) { - Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job.getConf()); - long length = file.getLen(); - BlockLocation[] blkLocations = fs - .getFileBlockLocations(file, 0, length); - if ((length != 0) && isSplitable(fs, path)) { - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); - LOG.info("computeSplitSize: " + splitSize + " (" + goalSize + ", " - + minSize + ", " + blockSize + ")"); - - long bytesRemaining = length; - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, length - - bytesRemaining, splitSize, clusterMap); - splits.add(new FileSplit(path, length - bytesRemaining, splitSize, - splitHosts)); - bytesRemaining -= splitSize; - } + LOG.debug("Total # of splits: " + splits.size()); + return splits.toArray(new InputSplit[splits.size()]); + } - if (bytesRemaining != 0) { - splits.add(new FileSplit(path, length - bytesRemaining, - bytesRemaining, blkLocations[blkLocations.length - 1] - .getHosts())); - } - } else if (length != 0) { - String[] splitHosts = getSplitHosts(blkLocations, 0, length, - clusterMap); - splits.add(new FileSplit(path, 0, length, splitHosts)); - } else { - // Create empty hosts array for zero length files - splits.add(new FileSplit(path, 0, length, new String[0])); - } - } - } - LOG.info("Total # of splits: " + splits.size()); - return splits.toArray(new FileSplit[splits.size()]); + /** + * @return true if the file is splittable (default), false if not. + */ + protected boolean isSplitable(BSPJob job, Path path) { + return true; + } + + /** + * Get the lower bound on split size imposed by the format. + * + * @return the number of bytes of the minimal split for this format + */ + protected long getFormatMinSplitSize() { + return 1; + } + + /** + * Set the minimum input split size + * + * @param job the job to modify + * @param size the minimum size + */ + public static void setMinInputSplitSize(Job job, long size) { + job.getConfiguration().setLong("bsp.min.split.size", size); + } + + /** + * Get the minimum split size + * + * @param job the job + * @return the minimum number of bytes that can be in a split + */ + public static long getMinSplitSize(BSPJob job) { + return job.getConfiguration().getLong("bsp.min.split.size", 1L); + } + + /** + * Set the maximum split size + * + * @param job the job to modify + * @param size the maximum split size + */ + public static void setMaxInputSplitSize(Job job, long size) { + job.getConfiguration().setLong("bsp.max.split.size", size); + } + + /** + * Get the maximum split size. + * + * @param context the job to look at. + * @return the maximum number of bytes a split can include + */ + public static long getMaxSplitSize(BSPJob context) { + return context.getConfiguration().getLong("bsp.max.split.size", + Long.MAX_VALUE); } protected long computeTotalSize(BSPJob job, FileStatus[] files) @@ -256,7 +279,7 @@ public abstract class FileInputFormatfalse otherwise */ public static boolean getCompressOutput(BSPJob conf) { - return conf.getConf().getBoolean("bsp.output.compress", false); + return conf.getConfiguration().getBoolean("bsp.output.compress", false); } /** @@ -60,7 +60,7 @@ public abstract class FileOutputFormat codecClass) { setCompressOutput(conf, true); - conf.getConf().setClass("bsp.output.compression.codec", codecClass, + conf.getConfiguration().setClass("bsp.output.compression.codec", codecClass, CompressionCodec.class); } @@ -79,7 +79,7 @@ public abstract class FileOutputFormat( numBSPTasks + 10); - this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS, + this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS, Constants.DEFAULT_MAX_TASK_ATTEMPTS); this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Nov 19 17:35:55 2012 @@ -230,7 +230,7 @@ public class LocalBSPRunner implements J conf.set(Constants.PEER_HOST, "local"); bsp = (BSP) ReflectionUtils.newInstance( - job.getConf().getClass("bsp.work.class", BSP.class), job.getConf()); + job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration()); } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Mon Nov 19 17:35:55 2012 @@ -50,12 +50,14 @@ public class RecoverTaskAction extends G return superstepNumber.get(); } + @Override public void write(DataOutput out) throws IOException { task.write(out); superstepNumber.write(out); } + @Override public void readFields(DataInput in) throws IOException { task = new BSPTask(); task.readFields(in); Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java Mon Nov 19 17:35:55 2012 @@ -19,17 +19,41 @@ package org.apache.hama.bsp; import java.io.IOException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; import org.apache.hadoop.io.SequenceFile; public class SequenceFileInputFormat extends FileInputFormat { - public SequenceFileInputFormat() { - setMinSplitSize(SequenceFile.SYNC_INTERVAL); - } - @Override public RecordReader getRecordReader(InputSplit split, BSPJob job) throws IOException { - return new SequenceFileRecordReader(job.getConf(), (FileSplit) split); + return new SequenceFileRecordReader(job.getConfiguration(), + (FileSplit) split); + } + + @Override + protected long getFormatMinSplitSize() { + return SequenceFile.SYNC_INTERVAL; } + + @Override + protected FileStatus[] listStatus(BSPJob job) throws IOException { + + FileStatus[] files = super.listStatus(job); + int len = files.length; + for (int i = 0; i < len; ++i) { + FileStatus file = files[i]; + if (file.isDir()) { // it's a MapFile + Path p = file.getPath(); + FileSystem fs = p.getFileSystem(job.getConfiguration()); + // use the data file + files[i] = fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)); + } + } + return files; + } + } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java Mon Nov 19 17:35:55 2012 @@ -35,7 +35,7 @@ public class SequenceFileRecordWriter { @Override public RecordReader getRecordReader(InputSplit split, BSPJob job) throws IOException { - return new LineRecordReader(job.getConf(), (FileSplit) split); + return new LineRecordReader(job.getConfiguration(), (FileSplit) split); + } + + @Override + protected boolean isSplitable(BSPJob job, Path path) { + CompressionCodec codec = new CompressionCodecFactory(job.getConfiguration()) + .getCodec(path); + return codec == null; } } Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (original) +++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Mon Nov 19 17:35:55 2012 @@ -104,11 +104,11 @@ public class TextOutputFormat exte public RecordWriter getRecordWriter(FileSystem ignored, BSPJob job, String name) throws IOException { boolean isCompressed = getCompressOutput(job); - String keyValueSeparator = job.getConf().get( + String keyValueSeparator = job.getConfiguration().get( "bsp.textoutputformat.separator", "\t"); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); - FileSystem fs = file.getFileSystem(job.getConf()); + FileSystem fs = file.getFileSystem(job.getConfiguration()); FSDataOutputStream fileOut = fs.create(file); return new LineRecordWriter(fileOut, keyValueSeparator); } else { @@ -116,11 +116,11 @@ public class TextOutputFormat exte job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, - job.getConf()); + job.getConfiguration()); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); - FileSystem fs = file.getFileSystem(job.getConf()); + FileSystem fs = file.getFileSystem(job.getConfiguration()); FSDataOutputStream fileOut = fs.create(file); return new LineRecordWriter(new DataOutputStream( codec.createOutputStream(fileOut)), keyValueSeparator); Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original) +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Nov 19 17:35:55 2012 @@ -509,7 +509,7 @@ public final class GraphJobRunner 50 && doubleVector.get(0) < 51); - assertTrue(doubleVector.get(1) > 50 && doubleVector.get(1) < 51); + assertTrue(doubleVector.get(0) >= 50 && doubleVector.get(0) < 51); + assertTrue(doubleVector.get(1) >= 50 && doubleVector.get(1) < 51); } finally { fs.delete(new Path("/tmp/clustering"), true); } Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff ============================================================================== --- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (original) +++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Mon Nov 19 17:35:55 2012 @@ -190,6 +190,11 @@ public class YARNBSPJobClient extends BS } @Override + protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException { + return Math.max(1, limitTasks); + } + + @Override public Path getSystemDir() { return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/")); }