Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 504B97633 for ; Tue, 1 Nov 2011 12:34:42 +0000 (UTC) Received: (qmail 50191 invoked by uid 500); 1 Nov 2011 12:34:42 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 50159 invoked by uid 500); 1 Nov 2011 12:34:41 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 50151 invoked by uid 99); 1 Nov 2011 12:34:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2011 12:34:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2011 12:34:37 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 73670238890B; Tue, 1 Nov 2011 12:34:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1195959 [1/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/testjar/ examples/src/main/java/org/apache/hama/exampl... Date: Tue, 01 Nov 2011 12:34:15 -0000 To: hama-commits@incubator.apache.org From: edwardyoon@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111101123417.73670238890B@eris.apache.org> Author: edwardyoon Date: Tue Nov 1 12:34:14 2011 New Revision: 1195959 URL: http://svn.apache.org/viewvc?rev=1195959&view=rev Log: Add input output system. Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java incubator/hama/trunk/core/src/test/java/testjar/ClassSerializePrinting.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Nov 1 12:34:14 2011 @@ -4,6 +4,7 @@ Release 0.4 - Unreleased NEW FEATURES + HAMA-258: Add Input Output system (edwardyoon) HAMA-456: Add Message Combiner (edwardyoon) HAMA-456: Add getPeerName(int index) and getNumPeers() (edwardyoon) HAMA-431: MapReduce NG integration (tjungblut) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Tue Nov 1 12:34:14 2011 @@ -25,43 +25,37 @@ import org.apache.zookeeper.KeeperExcept /** * This class provides an abstract implementation of the BSP interface. */ -public abstract class BSP implements BSPInterface { +public abstract class BSP implements BSPInterface { protected Configuration conf; /** - * This method is called before the BSP method. It can be used for setup - * purposes. + * This method is your computation method, the main work of your BSP should be + * done here. * * @param peer Your BSPPeer instance. */ - public void setup(BSPPeer peer) throws IOException, KeeperException, - InterruptedException { - - } + public abstract void bsp(BSPPeer peer, RecordReader input, + OutputCollector output) throws IOException, KeeperException, + InterruptedException; /** - * This method is called after the BSP method. It can be used for cleanup - * purposes. Cleanup is guranteed to be called after the BSP runs, even in - * case of exceptions. + * This method is called before the BSP method. It can be used for setup + * purposes. * * @param peer Your BSPPeer instance. */ - public void cleanup(BSPPeer peer) { - - } + public abstract void setup(BSPPeer peer) throws IOException, KeeperException, + InterruptedException; /** - * This method is your computation method, the main work of your BSP should be - * done here. + * This method is called after the BSP method. It can be used for cleanup + * purposes. Cleanup is guranteed to be called after the BSP runs, even in + * case of exceptions. * * @param peer Your BSPPeer instance. */ - @Override - public void bsp(BSPPeer peer) throws IOException, KeeperException, - InterruptedException { - - } + public abstract void cleanup(BSPPeer peer); /** * Returns the configuration of this BSP Job. @@ -85,5 +79,4 @@ public abstract class BSP implements BSP public void setConf(Configuration conf) { this.conf = conf; } - } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPInterface.java Tue Nov 1 12:34:14 2011 @@ -26,7 +26,7 @@ import org.apache.zookeeper.KeeperExcept * Interface BSP defines the basic operations needed to implement the BSP * algorithm. */ -public interface BSPInterface extends Configurable { +public interface BSPInterface extends Configurable { /** * A user defined function for programming in the BSP style. @@ -39,6 +39,7 @@ public interface BSPInterface extends Co * @throws KeeperException * @throws InterruptedException */ - public void bsp(BSPPeer bspPeer) throws IOException, KeeperException, + public void bsp(BSPPeer bspPeer, RecordReader input, + OutputCollector output) throws IOException, KeeperException, InterruptedException; } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Tue Nov 1 12:34:14 2011 @@ -23,6 +23,9 @@ import java.net.URLDecoder; import java.util.Enumeration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.HamaConfiguration; /** @@ -48,11 +51,11 @@ public class BSPJob extends BSPJobContex super(conf, null); jobClient = new BSPJobClient(conf); } - + public BSPJob(HamaConfiguration conf, BSPJobID jobID) throws IOException { super(conf, jobID); } - + public BSPJob(HamaConfiguration conf, String jobName) throws IOException { this(conf); setJobName(jobName); @@ -95,6 +98,7 @@ public class BSPJob extends BSPJobContex * @param cls * @throws IllegalStateException */ + @SuppressWarnings("unchecked") public void setBspClass(Class cls) throws IllegalStateException { ensureState(JobState.DEFINE); @@ -113,9 +117,10 @@ public class BSPJob extends BSPJobContex @SuppressWarnings("unchecked") public Class getCombinerClass() { - return (Class) conf.getClass(COMBINER_CLASS_ATTR, Combiner.class); + return (Class) conf.getClass(COMBINER_CLASS_ATTR, + Combiner.class); } - + public void setJar(String jar) { conf.set("bsp.jar", jar); } @@ -155,10 +160,6 @@ public class BSPJob extends BSPJobContex conf.set("bsp.job.name", name); } - public void setInputPath(HamaConfiguration conf, Path iNPUTPATH) { - - } - public void setUser(String user) { conf.set("user.name", user); } @@ -214,9 +215,9 @@ public class BSPJob extends BSPJobContex } return isSuccessful(); } - + // for the testcase - BSPJobClient getJobClient(){ + BSPJobClient getJobClient() { return jobClient; } @@ -231,4 +232,64 @@ public class BSPJob extends BSPJobContex public int getNumBspTask() { return conf.getInt("bsp.peers.num", 0); } + + @SuppressWarnings("unchecked") + public InputFormat getInputFormat() { + return (InputFormat) ReflectionUtils.newInstance(conf.getClass( + "bsp.input.format.class", TextInputFormat.class, InputFormat.class), conf); + } + + @SuppressWarnings("unchecked") + public void setInputFormat(Class cls) { + conf.setClass("bsp.input.format.class", cls, InputFormat.class); + } + + /** + * Get the key class for the job output data. + * + * @return the key class for the job output data. + */ + public Class getOutputKeyClass() { + return conf.getClass("bsp.output.key.class", LongWritable.class, + Object.class); + } + + /** + * Set the key class for the job output data. + * + * @param theClass the key class for the job output data. + */ + public void setOutputKeyClass(Class theClass) { + conf.setClass("bsp.output.key.class", theClass, Object.class); + } + + /** + * Get the value class for job outputs. + * + * @return the value class for job outputs. + */ + public Class getOutputValueClass() { + return conf.getClass("bsp.output.value.class", Text.class, Object.class); + } + + /** + * Set the value class for job outputs. + * + * @param theClass the value class for job outputs. + */ + public void setOutputValueClass(Class theClass) { + conf.setClass("bsp.output.value.class", theClass, Object.class); + } + + @SuppressWarnings("unchecked") + public void setOutputFormat(Class theClass) { + conf.setClass("bsp.output.format.class", theClass, OutputFormat.class); + } + + @SuppressWarnings("unchecked") + public OutputFormat getOutputFormat() { + return (OutputFormat) ReflectionUtils.newInstance(conf.getClass( + "bsp.output.format.class", TextOutputFormat.class, OutputFormat.class), + conf); + } } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Tue Nov 1 12:34:14 2011 @@ -17,8 +17,13 @@ */ package org.apache.hama.bsp; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; import java.util.Map; import java.util.Random; @@ -32,6 +37,11 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; @@ -200,8 +210,8 @@ public class BSPJobClient extends Config if (masterAdress != null && !masterAdress.equals("local")) { this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy( JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, - BSPMaster.getAddress(conf), conf, - NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); + BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); } else { LOG.debug("Using local BSP runner."); this.jobSubmitClient = new LocalBSPRunner(conf); @@ -283,6 +293,7 @@ public class BSPJobClient extends Config Path submitJobDir = new Path(getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); + Path submitSplitFile = new Path(submitJobDir, "job.split"); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitJobFile = new Path(submitJobDir, "job.xml"); LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir); @@ -298,15 +309,12 @@ public class BSPJobClient extends Config // check the number of BSP tasks int tasks = job.getNumBspTask(); int maxTasks = clusterStatus.getMaxTasks(); - if (tasks <= 0 || tasks > maxTasks) { - LOG.info("The number of tasks you've entered was invalid. Using default value of " - + maxTasks + "!"); job.setNumBspTask(maxTasks); } - // Create a number of filenames in the BSPMaster's fs namespace FileSystem fs = getFs(); + // Create a number of filenames in the BSPMaster's fs namespace fs.delete(submitJobDir, true); submitJobDir = fs.makeQualified(submitJobDir); submitJobDir = new Path(submitJobDir.toUri().getPath()); @@ -315,6 +323,11 @@ public class BSPJobClient extends Config fs.mkdirs(submitJobDir); short replication = (short) job.getInt("bsp.submit.replication", 10); + // Create the splits for the job + LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); + job.setNumBspTask(writeSplits(job, submitSplitFile)); + job.set("bsp.job.split.file", submitSplitFile.toString()); + String originalJarPath = job.getJar(); if (originalJarPath != null) { // copy jar to BSPMaster's fs @@ -354,8 +367,8 @@ public class BSPJobClient extends Config // // Now, actually submit the job (using the submit name) // - JobStatus status = jobSubmitClient.submitJob(jobId, - submitJobFile.makeQualified(fs).toString()); + JobStatus status = jobSubmitClient.submitJob(jobId, submitJobFile + .makeQualified(fs).toString()); if (status != null) { return new NetworkedJob(status); } else { @@ -363,6 +376,90 @@ public class BSPJobClient extends Config } } + private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException { + InputSplit[] splits = job.getInputFormat().getSplits(job, + job.getNumBspTask()); + // sort the splits into order based on size, so that the biggest + // go first + Arrays.sort(splits, new Comparator() { + public int compare(InputSplit a, InputSplit b) { + try { + long left = a.getLength(); + long right = b.getLength(); + if (left == right) { + return 0; + } else if (left < right) { + return 1; + } else { + return -1; + } + } catch (IOException ie) { + throw new RuntimeException("Problem getting input split size", ie); + } + } + }); + DataOutputStream out = writeSplitsFileHeader(job.getConf(), + submitSplitFile, splits.length); + + try { + DataOutputBuffer buffer = new DataOutputBuffer(); + RawSplit rawSplit = new RawSplit(); + for (InputSplit split : splits) { + rawSplit.setClassName(split.getClass().getName()); + buffer.reset(); + split.write(buffer); + rawSplit.setDataLength(split.getLength()); + rawSplit.setBytes(buffer.getData(), 0, buffer.getLength()); + rawSplit.setLocations(split.getLocations()); + rawSplit.write(out); + } + } finally { + out.close(); + } + return splits.length; + } + + private static final int CURRENT_SPLIT_FILE_VERSION = 0; + private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes(); + + private DataOutputStream writeSplitsFileHeader(Configuration conf, + Path filename, int length) throws IOException { + // write the splits to a file for the bsp master + FileSystem fs = filename.getFileSystem(conf); + FSDataOutputStream out = FileSystem.create(fs, filename, new FsPermission( + JOB_FILE_PERMISSION)); + out.write(SPLIT_FILE_HEADER); + WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION); + WritableUtils.writeVInt(out, length); + return out; + } + + /** + * Read a splits file into a list of raw splits + * + * @param in the stream to read from + * @return the complete list of splits + * @throws IOException + */ + static RawSplit[] readSplitFile(DataInput in) throws IOException { + byte[] header = new byte[SPLIT_FILE_HEADER.length]; + in.readFully(header); + if (!Arrays.equals(SPLIT_FILE_HEADER, header)) { + throw new IOException("Invalid header on split file"); + } + int vers = WritableUtils.readVInt(in); + if (vers != CURRENT_SPLIT_FILE_VERSION) { + throw new IOException("Unsupported split version " + vers); + } + int len = WritableUtils.readVInt(in); + RawSplit[] result = new RawSplit[len]; + for (int i = 0; i < len; ++i) { + result[i] = new RawSplit(); + result[i].readFields(in); + } + return result; + } + /** * Monitor a job and print status in real-time as progress is made and tasks * fail. @@ -583,9 +680,8 @@ public class BSPJobClient extends Config System.out.println("Job name: " + job.getJobName()); System.out.printf("States are:\n\tRunning : 1\tSucceded : 2" + "\tFailed : 3\tPrep : 4\n"); - System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), - jobStatus.getRunState(), jobStatus.getStartTime(), - jobStatus.getUsername()); + System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus + .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername()); exitCode = 0; } @@ -674,10 +770,76 @@ public class BSPJobClient extends Config } } + static class RawSplit implements Writable { + private String splitClass; + private BytesWritable bytes = new BytesWritable(); + private String[] locations; + long dataLength; + + public void setBytes(byte[] data, int offset, int length) { + bytes.set(data, offset, length); + } + + public void setClassName(String className) { + splitClass = className; + } + + public String getClassName() { + return splitClass; + } + + public BytesWritable getBytes() { + return bytes; + } + + public void clearBytes() { + bytes = null; + } + + public void setLocations(String[] locations) { + this.locations = locations; + } + + public String[] getLocations() { + return locations; + } + + public void readFields(DataInput in) throws IOException { + splitClass = Text.readString(in); + dataLength = in.readLong(); + bytes.readFields(in); + int len = WritableUtils.readVInt(in); + locations = new String[len]; + for (int i = 0; i < len; ++i) { + locations[i] = Text.readString(in); + } + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, splitClass); + out.writeLong(dataLength); + bytes.write(out); + WritableUtils.writeVInt(out, locations.length); + for (int i = 0; i < locations.length; i++) { + Text.writeString(out, locations[i]); + } + } + + public long getDataLength() { + return dataLength; + } + + public void setDataLength(long l) { + dataLength = l; + } + + } + /** */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(new BSPJobClient(), args); System.exit(res); } + } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Tue Nov 1 12:34:14 2011 @@ -56,7 +56,7 @@ public class BSPJobContext { return jobId; } - public Path getWorkingDirectory() throws IOException { + public Path getWorkingDirectory() { String name = conf.get(WORKING_DIR); if (name != null) { Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Tue Nov 1 12:34:14 2011 @@ -17,10 +17,16 @@ */ package org.apache.hama.bsp; +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Text; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hama.ipc.BSPPeerProtocol; import org.apache.zookeeper.KeeperException; @@ -33,16 +39,21 @@ public class BSPTask extends Task { public static final Log LOG = LogFactory.getLog(BSPTask.class); private BSPJob conf; + private BytesWritable split = new BytesWritable(); + private String splitClass; public BSPTask() { } public BSPTask(BSPJobID jobId, String jobFile, TaskAttemptID taskid, - int partition) { + int partition, String splitClass, BytesWritable split) { this.jobId = jobId; this.jobFile = jobFile; this.taskId = taskid; this.partition = partition; + + this.splitClass = splitClass; + this.split = split; } @Override @@ -54,12 +65,62 @@ public class BSPTask extends Task { public void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical) throws IOException { - BSP bsp = (BSP) ReflectionUtils.newInstance( - job.getConf().getClass("bsp.work.class", BSP.class), job.getConf()); + try { + runBSP(job, bspPeer, split, umbilical); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (ClassNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + done(umbilical); + } + + @SuppressWarnings("unchecked") + private void runBSP(final BSPJob job, BSPPeerImpl bspPeer, + final BytesWritable rawSplit, final BSPPeerProtocol umbilical) + throws IOException, InterruptedException, ClassNotFoundException { + InputSplit inputSplit = null; + // reinstantiate the split + try { + inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getConf() + .getClassByName(splitClass), job.getConf()); + } catch (ClassNotFoundException exp) { + IOException wrap = new IOException("Split class " + splitClass + + " not found"); + wrap.initCause(exp); + throw wrap; + } + + DataInputBuffer splitBuffer = new DataInputBuffer(); + splitBuffer.reset(split.getBytes(), 0, split.getLength()); + inputSplit.readFields(splitBuffer); + + RecordReader in = job.getInputFormat().getRecordReader( + inputSplit, job); + + FileSystem fs = FileSystem.get(job.getConf()); + String finalName = getOutputName(getPartition()); + + final RecordWriter out = + job.getOutputFormat().getRecordWriter(fs, job, finalName); + + OutputCollector collector = + new OutputCollector() { + public void collect(OUTK key, OUTV value) + throws IOException { + out.write(key, value); + } + }; + + BSP bsp = (BSP) ReflectionUtils.newInstance(job.getConf().getClass( + "bsp.work.class", BSP.class), job.getConf()); try { bsp.setup(bspPeer); - bsp.bsp(bspPeer); + bsp.bsp(bspPeer, in, collector); } catch (IOException e) { LOG.error("Exception during BSP execution!", e); } catch (KeeperException e) { @@ -68,9 +129,9 @@ public class BSPTask extends Task { LOG.error("Exception during BSP execution!", e); } finally { bsp.cleanup(bspPeer); + out.close(); } - done(umbilical); } public BSPJob getConf() { @@ -81,4 +142,19 @@ public class BSPTask extends Task { this.conf = conf; } + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, splitClass); + split.write(out); + split = null; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + splitClass = Text.readString(in); + split.readFields(in); + } + } Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,557 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.InvalidInputException; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +public abstract class FileInputFormat implements InputFormat { + public static final Log LOG = LogFactory.getLog(FileInputFormat.class); + + private static final double SPLIT_SLOP = 1.1; // 10% slop + + private long minSplitSize = 1; + private static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + protected void setMinSplitSize(long minSplitSize) { + this.minSplitSize = minSplitSize; + } + + /** + * Proxy PathFilter that accepts a path only if all filters given in the + * constructor do. Used by the listPaths() to apply the built-in + * hiddenFileFilter together with a user provided one (if any). + */ + private static class MultiPathFilter implements PathFilter { + private List filters; + + public MultiPathFilter(List filters) { + this.filters = filters; + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (!filter.accept(path)) { + return false; + } + } + return true; + } + } + + /** + * @param fs the file system that the file is on + * @param filename the file name to check + * @return is this file splitable? + */ + protected boolean isSplitable(FileSystem fs, Path filename) { + return true; + } + + public abstract RecordReader getRecordReader(InputSplit split, + BSPJob job) throws IOException; + + /** + * Set a PathFilter to be applied to the input paths for the map-reduce job. + * + * @param filter the PathFilter class use for filtering the input paths. + */ + public static void setInputPathFilter(BSPJob conf, + Class filter) { + conf.getConf().setClass("bsp.input.pathFilter.class", filter, + PathFilter.class); + } + + /** + * Get a PathFilter instance of the filter set for the input paths. + * + * @return the PathFilter instance set for the job, NULL if none has been set. + */ + public static PathFilter getInputPathFilter(BSPJob conf) { + Class filterClass = conf.getConf().getClass( + "bsp.input.pathFilter.class", null, PathFilter.class); + return (filterClass != null) ? ReflectionUtils.newInstance(filterClass, + conf.getConf()) : null; + } + + /** + * List input directories. Subclasses may override to, e.g., select only files + * matching a regular expression. + * + * @param job the job to list input paths for + * @return array of FileStatus objects + * @throws IOException if zero items. + */ + protected FileStatus[] listStatus(BSPJob job) throws IOException { + Path[] dirs = getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + + List result = new ArrayList(); + List errors = new ArrayList(); + + // creates a MultiPathFilter with the hiddenFileFilter and the + // user provided one (if any). + List filters = new ArrayList(); + filters.add(hiddenFileFilter); + PathFilter jobFilter = getInputPathFilter(job); + if (jobFilter != null) { + filters.add(jobFilter); + } + PathFilter inputFilter = new MultiPathFilter(filters); + + for (Path p : dirs) { + FileSystem fs = p.getFileSystem(job.getConf()); + FileStatus[] matches = fs.globStatus(p, inputFilter); + if (matches == null) { + errors.add(new IOException("Input path does not exist: " + p)); + } else if (matches.length == 0) { + errors.add(new IOException("Input Pattern " + p + " matches 0 files")); + } else { + for (FileStatus globStat : matches) { + if (globStat.isDir()) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), + inputFilter)) { + result.add(stat); + } + } else { + result.add(globStat); + } + } + } + } + + if (!errors.isEmpty()) { + throw new InvalidInputException(errors); + } + LOG.info("Total input paths to process : " + result.size()); + return result.toArray(new FileStatus[result.size()]); + } + + /** + * Splits files returned by {@link #listStatus(JobConf)} when they're too big. + */ + public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException { + FileStatus[] files = listStatus(job); + + long totalSize = 0; // compute total size + for (FileStatus file : files) { // check we have valid files + if (file.isDir()) { + throw new IOException("Not a file: " + file.getPath()); + } + totalSize += file.getLen(); + } + + long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); + long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1), + minSplitSize); + + // generate splits + ArrayList splits = new ArrayList(numSplits); + NetworkTopology clusterMap = new NetworkTopology(); + for (FileStatus file : files) { + Path path = file.getPath(); + FileSystem fs = path.getFileSystem(job.getConf()); + long length = file.getLen(); + BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + if ((length != 0) && isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); + + long bytesRemaining = length; + while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, splitSize, clusterMap); + splits.add(new FileSplit(path, length - bytesRemaining, splitSize, + splitHosts)); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + splits + .add(new FileSplit(path, length - bytesRemaining, bytesRemaining, + blkLocations[blkLocations.length - 1].getHosts())); + } + } else if (length != 0) { + String[] splitHosts = getSplitHosts(blkLocations, 0, length, clusterMap); + splits.add(new FileSplit(path, 0, length, splitHosts)); + } else { + // Create empty hosts array for zero length files + splits.add(new FileSplit(path, 0, length, new String[0])); + } + } + LOG.info("Total # of splits: " + splits.size()); + return splits.toArray(new FileSplit[splits.size()]); + } + + protected long computeSplitSize(long goalSize, long minSize, long blockSize) { + return Math.max(minSize, Math.min(goalSize, blockSize)); + } + + protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { + for (int i = 0; i < blkLocations.length; i++) { + // is the offset inside this block? + if ((blkLocations[i].getOffset() <= offset) + && (offset < blkLocations[i].getOffset() + + blkLocations[i].getLength())) { + return i; + } + } + BlockLocation last = blkLocations[blkLocations.length - 1]; + long fileLength = last.getOffset() + last.getLength() - 1; + throw new IllegalArgumentException("Offset " + offset + + " is outside of file (0.." + fileLength + ")"); + } + + /** + * Sets the given comma separated paths as the list of inputs for the + * map-reduce job. + * + * @param conf Configuration of the job + * @param commaSeparatedPaths Comma separated paths to be set as the list of + * inputs for the map-reduce job. + */ + public static void setInputPaths(BSPJob conf, String commaSeparatedPaths) { + setInputPaths(conf, StringUtils + .stringToPath(getPathStrings(commaSeparatedPaths))); + } + + /** + * Add the given comma separated paths to the list of inputs for the + * map-reduce job. + * + * @param conf The configuration of the job + * @param commaSeparatedPaths Comma separated paths to be added to the list of + * inputs for the map-reduce job. + */ + public static void addInputPaths(BSPJob conf, String commaSeparatedPaths) { + for (String str : getPathStrings(commaSeparatedPaths)) { + addInputPath(conf, new Path(str)); + } + } + + /** + * Set the array of {@link Path}s as the list of inputs for the map-reduce + * job. + * + * @param conf Configuration of the job. + * @param inputPaths the {@link Path}s of the input directories/files for the + * map-reduce job. + */ + public static void setInputPaths(BSPJob conf, Path... inputPaths) { + Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]); + StringBuffer str = new StringBuffer(StringUtils.escapeString(path + .toString())); + for (int i = 1; i < inputPaths.length; i++) { + str.append(StringUtils.COMMA_STR); + path = new Path(conf.getWorkingDirectory(), inputPaths[i]); + str.append(StringUtils.escapeString(path.toString())); + } + conf.set("bsp.input.dir", str.toString()); + } + + /** + * Add a {@link Path} to the list of inputs for the map-reduce job. + * + * @param conf The configuration of the job + * @param path {@link Path} to be added to the list of inputs for the + * map-reduce job. + */ + public static void addInputPath(BSPJob conf, Path path) { + path = new Path(conf.getWorkingDirectory(), path); + String dirStr = StringUtils.escapeString(path.toString()); + String dirs = conf.get("bsp.input.dir"); + conf.set("bsp.input.dir", dirs == null ? dirStr : dirs + + StringUtils.COMMA_STR + dirStr); + } + + // This method escapes commas in the glob pattern of the given paths. + private static String[] getPathStrings(String commaSeparatedPaths) { + int length = commaSeparatedPaths.length(); + int curlyOpen = 0; + int pathStart = 0; + boolean globPattern = false; + List pathStrings = new ArrayList(); + + for (int i = 0; i < length; i++) { + char ch = commaSeparatedPaths.charAt(i); + switch (ch) { + case '{': { + curlyOpen++; + if (!globPattern) { + globPattern = true; + } + break; + } + case '}': { + curlyOpen--; + if (curlyOpen == 0 && globPattern) { + globPattern = false; + } + break; + } + case ',': { + if (!globPattern) { + pathStrings.add(commaSeparatedPaths.substring(pathStart, i)); + pathStart = i + 1; + } + break; + } + } + } + pathStrings.add(commaSeparatedPaths.substring(pathStart, length)); + + return pathStrings.toArray(new String[0]); + } + + /** + * Get the list of input {@link Path}s for the map-reduce job. + * + * @param conf The configuration of the job + * @return the list of input {@link Path}s for the map-reduce job. + */ + public static Path[] getInputPaths(BSPJob conf) { + String dirs = conf.getConf().get("bsp.input.dir", ""); + String[] list = StringUtils.split(dirs); + Path[] result = new Path[list.length]; + for (int i = 0; i < list.length; i++) { + result[i] = new Path(StringUtils.unEscapeString(list[i])); + } + return result; + } + + private void sortInDescendingOrder(List mylist) { + Collections.sort(mylist, new Comparator() { + public int compare(NodeInfo obj1, NodeInfo obj2) { + + if (obj1 == null || obj2 == null) + return -1; + + if (obj1.getValue() == obj2.getValue()) { + return 0; + } else { + return ((obj1.getValue() < obj2.getValue()) ? 1 : -1); + } + } + }); + } + + /** + * This function identifies and returns the hosts that contribute most for a + * given split. For calculating the contribution, rack locality is treated on + * par with host locality, so hosts from racks that contribute the most are + * preferred over hosts on racks that contribute less + * + * @param blkLocations The list of block locations + * @param offset + * @param splitSize + * @return array of hosts that contribute most to this split + * @throws IOException + */ + protected String[] getSplitHosts(BlockLocation[] blkLocations, long offset, + long splitSize, NetworkTopology clusterMap) throws IOException { + + int startIndex = getBlockIndex(blkLocations, offset); + + long bytesInThisBlock = blkLocations[startIndex].getOffset() + + blkLocations[startIndex].getLength() - offset; + + // If this is the only block, just return + if (bytesInThisBlock >= splitSize) { + return blkLocations[startIndex].getHosts(); + } + + long bytesInFirstBlock = bytesInThisBlock; + int index = startIndex + 1; + splitSize -= bytesInThisBlock; + + while (splitSize > 0) { + bytesInThisBlock = Math.min(splitSize, blkLocations[index++].getLength()); + splitSize -= bytesInThisBlock; + } + + long bytesInLastBlock = bytesInThisBlock; + int endIndex = index - 1; + + Map hostsMap = new IdentityHashMap(); + Map racksMap = new IdentityHashMap(); + String[] allTopos = new String[0]; + + // Build the hierarchy and aggregate the contribution of + // bytes at each level. See TestGetSplitHosts.java + + for (index = startIndex; index <= endIndex; index++) { + + // Establish the bytes in this block + if (index == startIndex) { + bytesInThisBlock = bytesInFirstBlock; + } else if (index == endIndex) { + bytesInThisBlock = bytesInLastBlock; + } else { + bytesInThisBlock = blkLocations[index].getLength(); + } + + allTopos = blkLocations[index].getTopologyPaths(); + + // If no topology information is available, just + // prefix a fakeRack + if (allTopos.length == 0) { + allTopos = fakeRacks(blkLocations, index); + } + + // NOTE: This code currently works only for one level of + // hierarchy (rack/host). However, it is relatively easy + // to extend this to support aggregation at different + // levels + + for (String topo : allTopos) { + + Node node, parentNode; + NodeInfo nodeInfo, parentNodeInfo; + + node = clusterMap.getNode(topo); + + if (node == null) { + node = new NodeBase(topo); + clusterMap.add(node); + } + + nodeInfo = hostsMap.get(node); + + if (nodeInfo == null) { + nodeInfo = new NodeInfo(node); + hostsMap.put(node, nodeInfo); + parentNode = node.getParent(); + parentNodeInfo = racksMap.get(parentNode); + if (parentNodeInfo == null) { + parentNodeInfo = new NodeInfo(parentNode); + racksMap.put(parentNode, parentNodeInfo); + } + parentNodeInfo.addLeaf(nodeInfo); + } else { + nodeInfo = hostsMap.get(node); + parentNode = node.getParent(); + parentNodeInfo = racksMap.get(parentNode); + } + + nodeInfo.addValue(index, bytesInThisBlock); + parentNodeInfo.addValue(index, bytesInThisBlock); + + } // for all topos + + } // for all indices + + return identifyHosts(allTopos.length, racksMap); + } + + private String[] identifyHosts(int replicationFactor, + Map racksMap) { + + String[] retVal = new String[replicationFactor]; + + List rackList = new LinkedList(); + + rackList.addAll(racksMap.values()); + + // Sort the racks based on their contribution to this split + sortInDescendingOrder(rackList); + + boolean done = false; + int index = 0; + + // Get the host list for all our aggregated items, sort + // them and return the top entries + for (NodeInfo ni : rackList) { + + Set hostSet = ni.getLeaves(); + + List hostList = new LinkedList(); + hostList.addAll(hostSet); + + // Sort the hosts in this rack based on their contribution + sortInDescendingOrder(hostList); + + for (NodeInfo host : hostList) { + // Strip out the port number from the host name + retVal[index++] = host.node.getName().split(":")[0]; + if (index == replicationFactor) { + done = true; + break; + } + } + + if (done == true) { + break; + } + } + return retVal; + } + + private String[] fakeRacks(BlockLocation[] blkLocations, int index) + throws IOException { + String[] allHosts = blkLocations[index].getHosts(); + String[] allTopos = new String[allHosts.length]; + for (int i = 0; i < allHosts.length; i++) { + allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i]; + } + return allTopos; + } + + private static class NodeInfo { + final Node node; + final Set blockIds; + final Set leaves; + + private long value; + + NodeInfo(Node node) { + this.node = node; + blockIds = new HashSet(); + leaves = new HashSet(); + } + + long getValue() { + return value; + } + + void addValue(int blockIndex, long value) { + if (blockIds.add(blockIndex) == true) { + this.value += value; + } + } + + Set getLeaves() { + return leaves; + } + + void addLeaf(NodeInfo nodeInfo) { + leaves.add(nodeInfo); + } + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,198 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.text.NumberFormat; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.InvalidJobConfException; + +public abstract class FileOutputFormat implements OutputFormat { + + /** + * Set whether the output of the job is compressed. + * + * @param conf the {@link JobConf} to modify + * @param compress should the output of the job be compressed? + */ + public static void setCompressOutput(BSPJob conf, boolean compress) { + conf.getConf().setBoolean("bsp.output.compress", compress); + } + + /** + * Is the job output compressed? + * + * @param conf the {@link JobConf} to look in + * @return true if the job output should be compressed, + * false otherwise + */ + public static boolean getCompressOutput(BSPJob conf) { + return conf.getConf().getBoolean("bsp.output.compress", false); + } + + /** + * Set the {@link CompressionCodec} to be used to compress job outputs. + * + * @param conf the {@link JobConf} to modify + * @param codecClass the {@link CompressionCodec} to be used to compress the + * job outputs + */ + public static void setOutputCompressorClass(BSPJob conf, + Class codecClass) { + setCompressOutput(conf, true); + conf.getConf().setClass("bsp.output.compression.codec", codecClass, + CompressionCodec.class); + } + + /** + * Get the {@link CompressionCodec} for compressing the job outputs. + * + * @param conf the {@link JobConf} to look in + * @param defaultValue the {@link CompressionCodec} to return if not set + * @return the {@link CompressionCodec} to be used to compress the job outputs + * @throws IllegalArgumentException if the class was specified, but not found + */ + public static Class getOutputCompressorClass( + BSPJob conf, Class defaultValue) { + Class codecClass = defaultValue; + + String name = conf.get("bsp.output.compression.codec"); + if (name != null) { + try { + codecClass = conf.getConf().getClassByName(name).asSubclass( + CompressionCodec.class); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + name + + " was not found.", e); + } + } + return codecClass; + } + + public abstract RecordWriter getRecordWriter(FileSystem ignored, + BSPJob job, String name) throws IOException; + + public void checkOutputSpecs(FileSystem ignored, BSPJob job) + throws FileAlreadyExistsException, InvalidJobConfException, IOException { + // Ensure that the output directory is set and not already there + Path outDir = getOutputPath(job); + if (outDir == null && job.getNumBspTask() != 0) { + throw new InvalidJobConfException("Output directory not set in JobConf."); + } + if (outDir != null) { + FileSystem fs = outDir.getFileSystem(job.getConf()); + // normalize the output directory + outDir = fs.makeQualified(outDir); + setOutputPath(job, outDir); + // check its existence + if (fs.exists(outDir)) { + throw new FileAlreadyExistsException("Output directory " + outDir + + " already exists"); + } + } + } + + /** + * Set the {@link Path} of the output directory for the map-reduce job. + * + * @param conf The configuration of the job. + * @param outputDir the {@link Path} of the output directory for the + * map-reduce job. + */ + public static void setOutputPath(BSPJob conf, Path outputDir) { + outputDir = new Path(conf.getWorkingDirectory(), outputDir); + conf.set("bsp.output.dir", outputDir.toString()); + } + + /** + * Set the {@link Path} of the task's temporary output directory for the + * map-reduce job. + * + *

+ * Note: Task output path is set by the framework. + *

+ * + * @param conf The configuration of the job. + * @param outputDir the {@link Path} of the output directory for the + * map-reduce job. + */ + + static void setWorkOutputPath(BSPJob conf, Path outputDir) { + outputDir = new Path(conf.getWorkingDirectory(), outputDir); + conf.set("bsp.work.output.dir", outputDir.toString()); + } + + /** + * Get the {@link Path} to the output directory for the map-reduce job. + * + * @return the {@link Path} to the output directory for the map-reduce job. + * @see FileOutputFormat#getWorkOutputPath(JobConf) + */ + public static Path getOutputPath(BSPJob conf) { + String name = conf.get("bsp.output.dir"); + return name == null ? null : new Path(name); + } + + public static Path getWorkOutputPath(BSPJob conf) { + String name = conf.get("bsp.work.output.dir"); + return name == null ? null : new Path(name); + } + + /** + * Helper function to create the task's temporary output directory and return + * the path to the task's output file. + * + * @param conf job-configuration + * @param name temporary task-output filename + * @return path to the task's temporary output file + * @throws IOException + */ + public static Path getTaskOutputPath(BSPJob conf, String name) + throws IOException { + // ${bsp.out.dir} + Path outputPath = getOutputPath(conf); + if (outputPath == null) { + throw new IOException("Undefined job output-path"); + } + + Path workPath = outputPath; + + // ${bsp.out.dir}/_temporary/_${taskid}/${name} + return new Path(workPath, name); + } + + /** + * Helper function to generate a name that is unique for the task. + * + * @param conf the configuration for the job. + * @param name the name to make unique. + * @return a unique name accross all tasks of the job. + */ + public static String getUniqueName(BSPJob conf, String name) { + int partition = conf.getInt("bsp.task.partition", -1); + if (partition == -1) { + throw new IllegalArgumentException( + "This method can only be called from within a Job"); + } + + NumberFormat numberFormat = NumberFormat.getInstance(); + numberFormat.setMinimumIntegerDigits(5); + numberFormat.setGroupingUsed(false); + + return name + "-" + numberFormat.format(partition); + } + + /** + * Helper function to generate a {@link Path} for a file that is unique for + * the task within the job output directory. + * + * @param conf the configuration for the job. + * @param name the name for the file. + * @return a unique path accross all tasks of the job. + */ + public static Path getPathForCustomFile(BSPJob conf, String name) { + return new Path(getWorkOutputPath(conf), getUniqueName(conf, name)); + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileSplit.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,89 @@ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +public class FileSplit implements InputSplit { + private Path file; + private long start; + private long length; + private String[] hosts; + + FileSplit() { + } + + /** + * Constructs a split. + * + * @deprecated + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + */ + public FileSplit(Path file, long start, long length, BSPJob conf) { + this(file, start, length, (String[]) null); + } + + /** + * Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + */ + public FileSplit(Path file, long start, long length, String[] hosts) { + this.file = file; + this.start = start; + this.length = length; + this.hosts = hosts; + } + + /** The file containing this split's data. */ + public Path getPath() { + return file; + } + + /** The position of the first byte in the file to process. */ + public long getStart() { + return start; + } + + /** The number of bytes in the file to process. */ + public long getLength() { + return length; + } + + public String toString() { + return file + ":" + start + "+" + length; + } + + // ////////////////////////////////////////// + // Writable methods + // ////////////////////////////////////////// + + public void write(DataOutput out) throws IOException { + Text.writeString(out, file.toString()); + out.writeLong(start); + out.writeLong(length); + } + + public void readFields(DataInput in) throws IOException { + file = new Path(Text.readString(in)); + start = in.readLong(); + length = in.readLong(); + hosts = null; + } + + public String[] getLocations() throws IOException { + if (this.hosts == null) { + return new String[] {}; + } else { + return this.hosts; + } + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,11 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +public interface InputFormat { + + InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException; + + RecordReader getRecordReader(InputSplit split, BSPJob job) throws IOException; + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/InputSplit.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,25 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public interface InputSplit extends Writable { + + /** + * Get the total number of bytes in the data of the InputSplit. + * + * @return the number of bytes in the input split. + * @throws IOException + */ + long getLength() throws IOException; + + /** + * Get the list of hostnames where the input split is located. + * + * @return list of hostnames where data of the InputSplit is + * located as an array of Strings. + * @throws IOException + */ + String[] getLocations() throws IOException; +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Tue Nov 1 12:34:14 2011 @@ -17,6 +17,7 @@ */ package org.apache.hama.bsp; +import java.io.DataInputStream; import java.io.IOException; import org.apache.commons.logging.Log; @@ -72,6 +73,7 @@ class JobInProgress { int numBSPTasks = 0; int clusterSize; + String jobSplit; public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master, Configuration conf) throws IOException { @@ -80,9 +82,9 @@ class JobInProgress { this.localFs = FileSystem.getLocal(conf); this.jobFile = jobFile; this.master = master; - - this.status = new JobStatus(jobId, null, 0L, 0L, - JobStatus.State.PREP.value()); + + this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP + .value()); this.startTime = System.currentTimeMillis(); this.superstepCounter = 0; this.restartCount = 0; @@ -96,10 +98,10 @@ class JobInProgress { FileSystem fs = jobDir.getFileSystem(conf); fs.copyToLocalFile(jobFile, localJobFile); BSPJob job = new BSPJob(jobId, localJobFile.toString()); - this.numBSPTasks = job.getNumBspTask(); + this.jobSplit = job.getConf().get("bsp.job.split.file"); - this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), - job.getJobName()); + this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job + .getJobName()); this.setJobName(job.getJobName()); @@ -134,9 +136,9 @@ class JobInProgress { } public int getNumOfTasks() { - return tasks.length; + return tasks.length; } - + /** * @return the number of desired tasks. */ @@ -181,25 +183,34 @@ class JobInProgress { return; } - if (LOG.isDebugEnabled()) { - LOG.debug("numBSPTasks: " + numBSPTasks); + Path sysDir = new Path(this.master.getSystemDir()); + FileSystem fs = sysDir.getFileSystem(conf); + DataInputStream splitFile = fs.open(new Path(this.jobSplit)); + + BSPJobClient.RawSplit[] splits; + try { + splits = BSPJobClient.readSplitFile(splitFile); + } finally { + splitFile.close(); } + numBSPTasks = splits.length; + LOG.info("num BSPTasks: " + numBSPTasks); // adjust number of BSP tasks to actual number of splits this.tasks = new TaskInProgress[numBSPTasks]; for (int i = 0; i < numBSPTasks; i++) { tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(), - this.master, this.conf, this, i); + splits[i], this.master, this.conf, this, i); } // Update job status this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(), 0L, 0L, JobStatus.RUNNING); - // delete all nodes before start + // delete all nodes before start master.clearZKNodes(); master.createJobRoot(this.getJobID().toString()); - + tasksInited = true; LOG.info("Job is initialized."); } @@ -247,17 +258,17 @@ class JobInProgress { } if (allDone) { - this.status = new JobStatus(this.status.getJobID(), - this.profile.getUser(), superstepCounter, superstepCounter, - superstepCounter, JobStatus.SUCCEEDED, superstepCounter); + this.status = new JobStatus(this.status.getJobID(), this.profile + .getUser(), superstepCounter, superstepCounter, superstepCounter, + JobStatus.SUCCEEDED, superstepCounter); this.finishTime = System.currentTimeMillis(); this.status.setFinishTime(this.finishTime); LOG.info("Job successfully done."); - + // delete job root master.deleteJobRoot(this.getJobID().toString()); - + garbageCollect(); } } @@ -281,9 +292,9 @@ class JobInProgress { } if (allDone) { - this.status = new JobStatus(this.status.getJobID(), - this.profile.getUser(), superstepCounter, superstepCounter, - superstepCounter, JobStatus.FAILED, superstepCounter); + this.status = new JobStatus(this.status.getJobID(), this.profile + .getUser(), superstepCounter, superstepCounter, superstepCounter, + JobStatus.FAILED, superstepCounter); this.finishTime = System.currentTimeMillis(); this.status.setFinishTime(this.finishTime); Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LineRecordReader.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,150 @@ +package org.apache.hama.bsp; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +public class LineRecordReader implements RecordReader { + private static final Log LOG = LogFactory.getLog(LineRecordReader.class + .getName()); + + private CompressionCodecFactory compressionCodecs = null; + private long start; + private long pos; + private long end; + private LineReader in; + int maxLineLength; + + /** + * A class that provides a line reader from an input stream. + */ + public static class LineReader extends org.apache.hadoop.util.LineReader { + LineReader(InputStream in) { + super(in); + } + + LineReader(InputStream in, int bufferSize) { + super(in, bufferSize); + } + + public LineReader(InputStream in, Configuration conf) throws IOException { + super(in, conf); + } + } + + public LineRecordReader(Configuration job, FileSplit split) + throws IOException { + this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength", + Integer.MAX_VALUE); + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory(job); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(job); + FSDataInputStream fileIn = fs.open(split.getPath()); + boolean skipFirstLine = false; + if (codec != null) { + in = new LineReader(codec.createInputStream(fileIn), job); + end = Long.MAX_VALUE; + } else { + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, job); + } + if (skipFirstLine) { // skip first line and re-establish "start". + start += in.readLine(new Text(), 0, (int) Math.min( + (long) Integer.MAX_VALUE, end - start)); + } + this.pos = start; + } + + public LineRecordReader(InputStream in, long offset, long endOffset, + int maxLineLength) { + this.maxLineLength = maxLineLength; + this.in = new LineReader(in); + this.start = offset; + this.pos = offset; + this.end = endOffset; + } + + public LineRecordReader(InputStream in, long offset, long endOffset, + Configuration job) throws IOException { + this.maxLineLength = job.getInt("bsp.linerecordreader.maxlength", + Integer.MAX_VALUE); + this.in = new LineReader(in, job); + this.start = offset; + this.pos = offset; + this.end = endOffset; + } + + public LongWritable createKey() { + return new LongWritable(); + } + + public Text createValue() { + return new Text(); + } + + /** Read a line. */ + public synchronized boolean next(LongWritable key, Text value) + throws IOException { + + while (pos < end) { + key.set(pos); + + int newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min( + Integer.MAX_VALUE, end - pos), maxLineLength)); + if (newSize == 0) { + return false; + } + pos += newSize; + if (newSize < maxLineLength) { + return true; + } + + // line too long. try again + LOG + .info("Skipped line of size " + newSize + " at pos " + + (pos - newSize)); + } + + return false; + } + + /** + * Get the progress within the split + */ + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } + + public synchronized long getPos() throws IOException { + return pos; + } + + public synchronized void close() throws IOException { + if (in != null) { + in.close(); + } + } +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Nov 1 12:34:14 2011 @@ -209,7 +209,8 @@ public class LocalBSPRunner implements J bsp.setConf(conf); try { bsp.setup(groom); - bsp.bsp(groom); + // TODO + bsp.bsp(groom, null, null); } catch (Exception e) { LOG.error("Exception during BSP execution!", e); } Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullInputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,85 @@ +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.NullWritable; + +public class NullInputFormat implements InputFormat { + + @Override + public RecordReader getRecordReader( + InputSplit split, BSPJob job) throws IOException { + return new NullRecordReader(); + } + + @Override + public InputSplit[] getSplits(BSPJob job, int numBspTask) throws IOException { + InputSplit[] splits = new InputSplit[numBspTask]; + for (int i = 0; i < numBspTask; i++) { + splits[i] = new NullInputSplit(); + } + + return splits; + } + + public static class NullRecordReader implements + RecordReader { + private boolean returnRecord = true; + + @Override + public void close() throws IOException { + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public NullWritable createValue() { + return NullWritable.get(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + public boolean next(NullWritable key, NullWritable value) + throws IOException { + if (returnRecord == true) { + returnRecord = false; + return true; + } + + return returnRecord; + } + + } + + public static class NullInputSplit implements InputSplit { + public long getLength() { + return 0; + } + + public String[] getLocations() { + String[] locs = {}; + return locs; + } + + public void readFields(DataInput in) throws IOException { + } + + public void write(DataOutput out) throws IOException { + } + } + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/NullOutputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,20 @@ +package org.apache.hama.bsp; + +import org.apache.hadoop.fs.FileSystem; + +public class NullOutputFormat implements OutputFormat { + + public RecordWriter getRecordWriter(FileSystem ignored, BSPJob job, + String name) { + return new RecordWriter() { + public void write(K key, V value) { + } + + public void close() { + } + }; + } + + public void checkOutputSpecs(FileSystem ignored, BSPJob job) { + } +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputCollector.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,15 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +public interface OutputCollector { + + /** + * Adds a key/value pair to the output. + * + * @param key the key to collect. + * @param value to value to collect. + * @throws IOException + */ + void collect(K key, V value) throws IOException; +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/OutputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,35 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; + +public interface OutputFormat { + + /** + * Get the {@link RecordWriter} for the given job. + * + * @param ignored + * @param job configuration for the job whose output is being written. + * @param name the unique name for this part of the output. + * @return a {@link RecordWriter} to write the output for the job. + * @throws IOException + */ + RecordWriter getRecordWriter(FileSystem ignored, BSPJob job, String name) + throws IOException; + + /** + * Check for validity of the output-specification for the job. + * + *

+ * This is to validate the output specification for the job when it is a job + * is submitted. Typically checks that it does not already exist, throwing an + * exception when it already exists, so that output is not overwritten. + *

+ * + * @param ignored + * @param job job configuration. + * @throws IOException when output should not be attempted + */ + void checkOutputSpecs(FileSystem ignored, BSPJob job) throws IOException; +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordReader.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,54 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +public interface RecordReader { + + /** + * Reads the next key/value pair from the input for processing. + * + * @param key the key to read data into + * @param value the value to read data into + * @return true iff a key/value was read, false if at EOF + */ + boolean next(K key, V value) throws IOException; + + /** + * Create an object of the appropriate type to be used as a key. + * + * @return a new key object. + */ + K createKey(); + + /** + * Create an object of the appropriate type to be used as a value. + * + * @return a new value object. + */ + V createValue(); + + /** + * Returns the current position in the input. + * + * @return the current position in the input. + * @throws IOException + */ + long getPos() throws IOException; + + /** + * Close this {@link InputSplit} to future operations. + * + * @throws IOException + */ + public void close() throws IOException; + + /** + * How much of the input has the {@link RecordReader} consumed i.e. has been + * processed by? + * + * @return progress from 0.0 to 1.0. + * @throws IOException + */ + float getProgress() throws IOException; + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecordWriter.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,22 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +public interface RecordWriter { + /** + * Writes a key/value pair. + * + * @param key the key to write. + * @param value the value to write. + * @throws IOException + */ + void write(K key, V value) throws IOException; + + /** + * Close this RecordWriter to future operations. + * + * @param reporter facility to report progress. + * @throws IOException + */ + void close() throws IOException; +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Tue Nov 1 12:34:14 2011 @@ -20,6 +20,7 @@ package org.apache.hama.bsp; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.text.NumberFormat; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -93,6 +94,18 @@ public abstract class Task implements Wr public int getPartition() { return partition; } + + /** Construct output file names so that, when an output directory listing is + * sorted lexicographically, positions correspond to output partitions.*/ + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + static synchronized String getOutputName(int partition) { + return "part-" + NUMBER_FORMAT.format(partition); + } @Override public String toString() { Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue Nov 1 12:34:14 2011 @@ -24,7 +24,8 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hama.bsp.BSPJobClient.RawSplit; /** *TaskInProgress maintains all the info needed for a Task in the lifetime of @@ -75,6 +76,8 @@ class TaskInProgress { private BSPJobID jobId; + private RawSplit rawSplit; + /** * Constructor for new nexus between BSPMaster and GroomServer. * @@ -90,10 +93,11 @@ class TaskInProgress { init(jobId); } - public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master, + public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit, BSPMaster master, Configuration conf, JobInProgress job, int partition) { this.jobId = jobId; this.jobFile = jobFile; + this.rawSplit = rawSplit; this.setBspMaster(master); this.job = job; this.setConf(conf); @@ -102,7 +106,7 @@ class TaskInProgress { init(jobId); } - private void init(BSPJobID jobId2) { + private void init(BSPJobID jobId) { this.id = new TaskID(jobId, partition); this.startTime = System.currentTimeMillis(); } @@ -125,7 +129,10 @@ class TaskInProgress { return null; } - t = new BSPTask(jobId, jobFile, taskid, partition); + String splitClass = rawSplit.getClassName(); + BytesWritable split = rawSplit.getBytes(); + + t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split); activeTasks.put(taskid, status.getGroomName()); return t; Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Nov 1 12:34:14 2011 @@ -102,7 +102,7 @@ public class TaskLog { public static synchronized void cleanup(int logsRetainHours) throws IOException { // Purge logs of tasks on this tasktracker if their - // mtime has exceeded "mapred.task.log.retain" hours + // mtime has exceeded "bsp.task.log.retain" hours long purgeTimeStamp = System.currentTimeMillis() - (logsRetainHours * 60L * 60 * 1000); File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter( @@ -199,7 +199,7 @@ public class TaskLog { * @return the number of bytes to cap the log files at */ public static long getTaskLogLength(HamaConfiguration conf) { - return conf.getLong("mapred.userlog.limit.kb", 100) * 1024; + return conf.getLong("bsp.userlog.limit.kb", 100) * 1024; } /** Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,15 @@ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +public class TextInputFormat extends FileInputFormat { + + public RecordReader getRecordReader(InputSplit split, BSPJob job) + throws IOException { + return new LineRecordReader(job.getConf(), (FileSplit) split); + } + +} Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1195959&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Tue Nov 1 12:34:14 2011 @@ -0,0 +1,109 @@ +package org.apache.hama.bsp; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.util.ReflectionUtils; + +public class TextOutputFormat extends FileOutputFormat { + + protected static class LineRecordWriter implements RecordWriter { + private static final String utf8 = "UTF-8"; + private static final byte[] newline; + static { + try { + newline = "\n".getBytes(utf8); + } catch (UnsupportedEncodingException uee) { + throw new IllegalArgumentException("can't find " + utf8 + " encoding"); + } + } + + protected DataOutputStream out; + private final byte[] keyValueSeparator; + + public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { + this.out = out; + try { + this.keyValueSeparator = keyValueSeparator.getBytes(utf8); + } catch (UnsupportedEncodingException uee) { + throw new IllegalArgumentException("can't find " + utf8 + " encoding"); + } + } + + public LineRecordWriter(DataOutputStream out) { + this(out, "\t"); + } + + /** + * Write the object to the byte stream, handling Text as a special case. + * + * @param o the object to print + * @throws IOException if the write throws, we pass it on + */ + private void writeObject(Object o) throws IOException { + if (o instanceof Text) { + Text to = (Text) o; + out.write(to.getBytes(), 0, to.getLength()); + } else { + out.write(o.toString().getBytes(utf8)); + } + } + + public synchronized void write(K key, V value) throws IOException { + + boolean nullKey = key == null || key instanceof NullWritable; + boolean nullValue = value == null || value instanceof NullWritable; + if (nullKey && nullValue) { + return; + } + if (!nullKey) { + writeObject(key); + } + if (!(nullKey || nullValue)) { + out.write(keyValueSeparator); + } + if (!nullValue) { + writeObject(value); + } + out.write(newline); + } + + public synchronized void close() throws IOException { + out.close(); + } + } + + public RecordWriter getRecordWriter(FileSystem ignored, BSPJob job, + String name) throws IOException { + boolean isCompressed = getCompressOutput(job); + String keyValueSeparator = job.getConf().get( + "bsp.textoutputformat.separator", "\t"); + if (!isCompressed) { + Path file = FileOutputFormat.getTaskOutputPath(job, name); + FileSystem fs = file.getFileSystem(job.getConf()); + FSDataOutputStream fileOut = fs.create(file); + return new LineRecordWriter(fileOut, keyValueSeparator); + } else { + Class codecClass = getOutputCompressorClass( + job, GzipCodec.class); + // create the named codec + CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job + .getConf()); + // build the filename including the extension + Path file = FileOutputFormat.getTaskOutputPath(job, name + + codec.getDefaultExtension()); + FileSystem fs = file.getFileSystem(job.getConf()); + FSDataOutputStream fileOut = fs.create(file); + return new LineRecordWriter(new DataOutputStream(codec + .createOutputStream(fileOut)), keyValueSeparator); + } + } +} Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java Tue Nov 1 12:34:14 2011 @@ -1010,7 +1010,6 @@ public class Bytes { /** * Split passed range. Expensive operation relatively. Uses BigInteger math. - * Useful splitting ranges for MapReduce jobs. * * @param a Beginning of range * @param b End of range Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1195959&r1=1195958&r2=1195959&view=diff ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Tue Nov 1 12:34:14 2011 @@ -66,7 +66,8 @@ public class TestBSPMasterGroomServer ex ClusterStatus cluster = jobClient.getClusterStatus(false); assertEquals(this.numOfGroom, cluster.getGroomServers()); bsp.setNumBspTask(2); - + bsp.setInputFormat(NullInputFormat.class); + FileSystem fileSys = FileSystem.get(conf); if (bsp.waitForCompletion(true)) {