Return-Path: X-Original-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hama-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0AA5A7BA9 for ; Wed, 9 Nov 2011 10:28:32 +0000 (UTC) Received: (qmail 32316 invoked by uid 500); 9 Nov 2011 10:28:32 -0000 Delivered-To: apmail-incubator-hama-commits-archive@incubator.apache.org Received: (qmail 32285 invoked by uid 500); 9 Nov 2011 10:28:31 -0000 Mailing-List: contact hama-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hama-dev@incubator.apache.org Delivered-To: mailing list hama-commits@incubator.apache.org Received: (qmail 32277 invoked by uid 99); 9 Nov 2011 10:28:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2011 10:28:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2011 10:28:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8C3D523889CB; Wed, 9 Nov 2011 10:28:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1199686 - in /incubator/hama/trunk/core/src: main/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/ Date: Wed, 09 Nov 2011 10:28:07 -0000 To: hama-commits@incubator.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111109102807.8C3D523889CB@eris.apache.org> Author: tjungblut Date: Wed Nov 9 10:28:06 2011 New Revision: 1199686 URL: http://svn.apache.org/viewvc?rev=1199686&view=rev Log: Added partitioning Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (with props) incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (with props) incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java (with props) incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java (with props) incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (with props) incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java (with props) incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (with props) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1199686&r1=1199685&r2=1199686&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Nov 9 10:28:06 2011 @@ -290,21 +290,38 @@ public class BSPJob extends BSPJobContex public void setOutputPath(Path path) { conf.set("bsp.output.dir", path.toString()); } - + /** * Sets the input path for the job. * - * @param path where the output gets written. */ public void setInputPath(Path path) { conf.set("bsp.input.dir", path.toString()); } + /** + * Sets the output format for the job. + */ @SuppressWarnings("rawtypes") public void setOutputFormat(Class theClass) { conf.setClass("bsp.output.format.class", theClass, OutputFormat.class); } + /** + * Sets the partitioner for the input of the job. + */ + @SuppressWarnings("rawtypes") + public void setPartitioner(Class theClass) { + conf.setClass("bsp.input.partitioner.class", theClass, Partitioner.class); + } + + @SuppressWarnings("rawtypes") + public Partitioner getPartitioner() { + return (Partitioner) ReflectionUtils.newInstance(conf + .getClass("bsp.input.partitioner.class", HashPartitioner.class, + Partitioner.class), conf); + } + @SuppressWarnings("rawtypes") public OutputFormat getOutputFormat() { return (OutputFormat) ReflectionUtils.newInstance(conf.getClass( Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1199686&r1=1199685&r2=1199686&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Nov 9 10:28:06 2011 @@ -22,8 +22,9 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Random; @@ -39,12 +40,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hama.HamaConfiguration; @@ -290,6 +296,8 @@ public class BSPJobClient extends Config public RunningJob submitJobInternal(BSPJob job) throws IOException { BSPJobID jobId = jobSubmitClient.getNewJobId(); + + job.setJobID(jobId); Path submitJobDir = new Path(getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36)); @@ -327,6 +335,7 @@ public class BSPJobClient extends Config if (job.get("bsp.input.dir") != null) { // Create the splits for the job LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); + job = partition(job); job.setNumBspTask(writeSplits(job, submitSplitFile)); job.set("bsp.job.split.file", submitSplitFile.toString()); } @@ -379,31 +388,135 @@ public class BSPJobClient extends Config } } - private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException { - InputSplit[] splits = job.getInputFormat().getSplits(job, - job.getNumBspTask()); - // sort the splits into order based on size, so that the biggest - // go first - Arrays.sort(splits, new Comparator() { - public int compare(InputSplit a, InputSplit b) { + @SuppressWarnings({ "rawtypes", "unchecked" }) + private BSPJob partition(BSPJob job) throws IOException { + if (job.getConf().get("bsp.input.partitioner.class") != null) { + InputSplit[] splits = job.getInputFormat().getSplits(job, + job.getNumBspTask()); + int numOfTasks = job.getNumBspTask(); + String input = job.getConf().get("bsp.input.dir"); + + if (input != null) { + InputFormat inputFormat = job.getInputFormat(); + + Path partitionedPath = new Path(input, "hama-partitions"); + Path inputPath = new Path(input); + if (fs.isFile(inputPath)) { + partitionedPath = new Path(inputPath.getParent(), "hama-partitions"); + } + + String alternatePart = job.get("bsp.partitioning.dir"); + if (alternatePart != null) { + partitionedPath = new Path(alternatePart, job.getJobID().toString()); + } + + if (fs.exists(partitionedPath)) { + fs.delete(partitionedPath, true); + } else { + fs.mkdirs(partitionedPath); + } + // FIXME this is soo unsafe + RecordReader sampleReader = inputFormat.getRecordReader(splits[0], job); + + List writers = new ArrayList( + numOfTasks); + + CompressionType compressionType = getOutputCompressionType(job); + Class outputCompressorClass = getOutputCompressorClass( + job, null); + CompressionCodec codec = null; + if (outputCompressorClass != null) { + codec = ReflectionUtils.newInstance(outputCompressorClass, + job.getConf()); + } + try { - long left = a.getLength(); - long right = b.getLength(); - if (left == right) { - return 0; - } else if (left < right) { - return 1; - } else { - return -1; + for (int i = 0; i < numOfTasks; i++) { + Path p = new Path(partitionedPath, "part-" + i); + if (codec == null) { + writers.add(SequenceFile.createWriter(fs, job.getConf(), p, + sampleReader.createKey().getClass(), sampleReader + .createValue().getClass(), CompressionType.NONE)); + } else { + writers.add(SequenceFile.createWriter(fs, job.getConf(), p, + sampleReader.createKey().getClass(), sampleReader + .createValue().getClass(), compressionType, codec)); + } + } + + Partitioner partitioner = job.getPartitioner(); + for (int i = 0; i < splits.length; i++) { + InputSplit split = splits[i]; + RecordReader recordReader = inputFormat.getRecordReader(split, job); + Object key = recordReader.createKey(); + Object value = recordReader.createValue(); + while (recordReader.next(key, value)) { + int index = partitioner.getPartition(key, value, numOfTasks); + writers.get(index).append(key, value); + } + LOG.debug("Done with split " + i); + } + } finally { + for (SequenceFile.Writer wr : writers) { + wr.close(); } - } catch (IOException ie) { - throw new RuntimeException("Problem getting input split size", ie); } + + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputPath(partitionedPath); } - }); - DataOutputStream out = writeSplitsFileHeader(job.getConf(), - submitSplitFile, splits.length); + } + return job; + } + + /** + * Get the {@link CompressionType} for the output {@link SequenceFile}. + * + * @param job the {@link Job} + * @return the {@link CompressionType} for the output {@link SequenceFile}, + * defaulting to {@link CompressionType#RECORD} + */ + static CompressionType getOutputCompressionType(BSPJob job) { + String val = job.get("bsp.partitioning.compression.type"); + if (val != null) { + return CompressionType.valueOf(val); + } else { + return CompressionType.NONE; + } + } + + /** + * Get the {@link CompressionCodec} for compressing the job outputs. + * + * @param job the {@link Job} to look in + * @param defaultValue the {@link CompressionCodec} to return if not set + * @return the {@link CompressionCodec} to be used to compress the job outputs + * @throws IllegalArgumentException if the class was specified, but not found + */ + static Class getOutputCompressorClass(BSPJob job, + Class defaultValue) { + Class codecClass = defaultValue; + Configuration conf = job.getConf(); + String name = conf.get("bsp.partitioning.compression.codec"); + if (name != null) { + try { + codecClass = conf.getClassByName(name).asSubclass( + CompressionCodec.class); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Compression codec " + name + + " was not found.", e); + } + } + return codecClass; + } + + private int writeSplits(BSPJob job, Path submitSplitFile) throws IOException { + InputSplit[] splits = job.getInputFormat().getSplits(job, + job.getNumBspTask()); + + final DataOutputStream out = writeSplitsFileHeader(job.getConf(), + submitSplitFile, splits.length); try { DataOutputBuffer buffer = new DataOutputBuffer(); RawSplit rawSplit = new RawSplit(); Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1199686&r1=1199685&r2=1199686&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Wed Nov 9 10:28:06 2011 @@ -31,21 +31,21 @@ import org.apache.hama.HamaConfiguration */ public class BSPJobContext { // Put all of the attribute names in here so that BSPJob and JobContext are - // consistent. + // consistent. protected static final String WORK_CLASS_ATTR = "bsp.work.class"; protected static final String COMBINER_CLASS_ATTR = "bsp.combiner.class"; protected static final String INPUT_FORMAT_CLASS_ATTR = "bsp.inputformat.class"; protected static final String OUTPUT_FORMAT_CLASS_ATTR = "bsp.outputformat.class"; protected static final String WORKING_DIR = "bsp.working.dir"; - + protected final Configuration conf; - private final BSPJobID jobId; + private BSPJobID jobId; public BSPJobContext(Configuration conf, BSPJobID jobId) { this.conf = conf; this.jobId = jobId; } - + public BSPJobContext(Path config, BSPJobID jobId) throws IOException { this.conf = new HamaConfiguration(); this.jobId = jobId; @@ -56,6 +56,10 @@ public class BSPJobContext { return jobId; } + void setJobID(BSPJobID id) { + this.jobId = id; + } + public Path getWorkingDirectory() { String name = conf.get(WORKING_DIR); @@ -79,32 +83,32 @@ public class BSPJobContext { public String getJar() { return conf.get("bsp.jar"); } - - /** - * Constructs a local file name. Files are distributed among configured - * local directories. + + /** + * Constructs a local file name. Files are distributed among configured local + * directories. */ public Path getLocalPath(String pathString) throws IOException { return conf.getLocalPath("bsp.local.dir", pathString); } - + public String getUser() { return conf.get("user.name"); } - + public void writeXml(OutputStream out) throws IOException { conf.writeXml(out); } - + public Configuration getConf() { return this.conf; } - + public String get(String name) { return conf.get(name); } - + public int getInt(String name, int defaultValue) { - return conf.getInt(name, defaultValue); + return conf.getInt(name, defaultValue); } } Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,594 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.NodeBase; + +/** + * An abstract {@link org.apache.hadoop.mapred.InputFormat}. Splits are + * constructed from the files under the input paths. A split cannot have files + * from different pools. Each split returned may contain blocks from different + * files. If a maxSplitSize is specified, then blocks on the same node are + * combined to form a single split. Blocks that are left over are then combined + * with other blocks in the same rack. If maxSplitSize is not specified, then + * blocks from the same rack are combined in a single split; no attempt is made + * to create node-local splits. If the maxSplitSize is equal to the block size, + * then this class is similar to the default spliting behaviour in Hadoop: each + * block is a locally processed split. Subclasses implement + * {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)} + * to construct RecordReader's for CombineFileSplit's. + * + * @see CombineFileSplit + */ +public abstract class CombineFileInputFormat extends + FileInputFormat { + + // ability to limit the size of a single split + private long maxSplitSize = 0; + private long minSplitSizeNode = 0; + private long minSplitSizeRack = 0; + + // A pool of input paths filters. A split cannot have blocks from files + // across multiple pools. + private ArrayList pools = new ArrayList(); + + // mapping from a rack name to the set of Nodes in the rack + private static HashMap> rackToNodes = new HashMap>(); + + /** + * This has to be overridden from concrete formats, we provide a SequenceFile + * version of it for partitioning. + * + * @param split + * @param context + * @return + * @throws IOException + */ + public abstract RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) throws IOException; + + /** + * Specify the maximum size (in bytes) of each split. Each split is + * approximately equal to the specified size. + */ + protected void setMaxSplitSize(long maxSplitSize) { + this.maxSplitSize = maxSplitSize; + } + + /** + * Specify the minimum size (in bytes) of each split per node. This applies to + * data that is left over after combining data on a single node into splits + * that are of maximum size specified by maxSplitSize. This leftover data will + * be combined into its own split if its size exceeds minSplitSizeNode. + */ + protected void setMinSplitSizeNode(long minSplitSizeNode) { + this.minSplitSizeNode = minSplitSizeNode; + } + + /** + * Specify the minimum size (in bytes) of each split per rack. This applies to + * data that is left over after combining data on a single rack into splits + * that are of maximum size specified by maxSplitSize. This leftover data will + * be combined into its own split if its size exceeds minSplitSizeRack. + */ + protected void setMinSplitSizeRack(long minSplitSizeRack) { + this.minSplitSizeRack = minSplitSizeRack; + } + + /** + * Create a new pool and add the filters to it. A split cannot have files from + * different pools. + */ + protected void createPool(Configuration conf, List filters) { + pools.add(new MultiPathFilter(filters)); + } + + /** + * Create a new pool and add the filters to it. A pathname can satisfy any one + * of the specified filters. A split cannot have files from different pools. + */ + protected void createPool(Configuration conf, PathFilter... filters) { + MultiPathFilter multi = new MultiPathFilter(); + for (PathFilter f : filters) { + multi.add(f); + } + pools.add(multi); + } + + /** + * default constructor + */ + public CombineFileInputFormat() { + } + + @Override + public InputSplit[] getSplits(BSPJob bspJob, int numSplits) + throws IOException { + + Configuration job = bspJob.getConf(); + + long minSizeNode = 0; + long minSizeRack = 0; + long maxSize = 0; + + // the values specified by setxxxSplitSize() takes precedence over the + // values that might have been specified in the config + if (minSplitSizeNode != 0) { + minSizeNode = minSplitSizeNode; + } else { + minSizeNode = job.getLong("mapred.min.split.size.per.node", 0); + } + if (minSplitSizeRack != 0) { + minSizeRack = minSplitSizeRack; + } else { + minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0); + } + if (maxSplitSize != 0) { + maxSize = maxSplitSize; + } else { + maxSize = job.getLong("mapred.max.split.size", 0); + } + if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { + throw new IOException("Minimum split size pernode " + minSizeNode + + " cannot be larger than maximum split size " + maxSize); + } + if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { + throw new IOException("Minimum split size per rack" + minSizeRack + + " cannot be larger than maximum split size " + maxSize); + } + if (minSizeRack != 0 && minSizeNode > minSizeRack) { + throw new IOException("Minimum split size per node" + minSizeNode + + " cannot be smaller than minimum split size per rack " + + minSizeRack); + } + + // all the files in input set + Path[] paths = FileUtil.stat2Paths(listStatus(bspJob)); + List splits = new ArrayList(); + if (paths.length == 0) { + return splits.toArray(new CombineFileSplit[splits.size()]); + } + + // In one single iteration, process all the paths in a single pool. + // Processing one pool at a time ensures that a split contans paths + // from a single pool only. + for (MultiPathFilter onepool : pools) { + ArrayList myPaths = new ArrayList(); + + // pick one input path. If it matches all the filters in a pool, + // add it to the output set + for (int i = 0; i < paths.length; i++) { + if (paths[i] == null) { // already processed + continue; + } + Path p = new Path(paths[i].toUri().getPath()); + if (onepool.accept(p)) { + myPaths.add(paths[i]); // add it to my output set + paths[i] = null; // already processed + } + } + // create splits for all files in this pool. + getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize, + minSizeNode, minSizeRack, splits); + } + + // Finally, process all paths that do not belong to any pool. + ArrayList myPaths = new ArrayList(); + for (int i = 0; i < paths.length; i++) { + if (paths[i] == null) { // already processed + continue; + } + myPaths.add(paths[i]); + } + // create splits for all files that are not in any pool. + getMoreSplits(bspJob, myPaths.toArray(new Path[myPaths.size()]), maxSize, + minSizeNode, minSizeRack, splits); + + // free up rackToNodes map + rackToNodes.clear(); + return splits.toArray(new CombineFileSplit[splits.size()]); + } + + /** + * Return all the splits in the specified set of paths + */ + private void getMoreSplits(BSPJob job, Path[] paths, long maxSize, + long minSizeNode, long minSizeRack, List splits) + throws IOException { + + // all blocks for all the files in input set + OneFileInfo[] files; + + // mapping from a rack name to the list of blocks it has + HashMap> rackToBlocks = new HashMap>(); + + // mapping from a block to the nodes on which it has replicas + HashMap blockToNodes = new HashMap(); + + // mapping from a node to the list of blocks that it contains + HashMap> nodeToBlocks = new HashMap>(); + + files = new OneFileInfo[paths.length]; + if (paths.length == 0) { + return; + } + + // populate all the blocks for all files + long totLength = 0; + for (int i = 0; i < paths.length; i++) { + files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, + nodeToBlocks); + totLength += files[i].getLength(); + } + + ArrayList validBlocks = new ArrayList(); + ArrayList nodes = new ArrayList(); + long curSplitSize = 0; + + // process all nodes and create splits that are local + // to a node. + for (Iterator>> iter = nodeToBlocks + .entrySet().iterator(); iter.hasNext();) { + + Map.Entry> one = iter.next(); + nodes.add(one.getKey()); + List blocksInNode = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + for (OneBlockInfo oneblock : blocksInNode) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(job, splits, nodes, validBlocks); + curSplitSize = 0; + validBlocks.clear(); + } + } + } + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the same rack later + // on. + if (minSizeNode != 0 && curSplitSize >= minSizeNode) { + // create an input split and add it to the splits array + addCreatedSplit(job, splits, nodes, validBlocks); + } else { + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); + } + } + validBlocks.clear(); + nodes.clear(); + curSplitSize = 0; + } + + // if blocks in a rack are below the specified minimum size, then keep them + // in 'overflow'. After the processing of all racks is complete, these + // overflow + // blocks will be combined into splits. + ArrayList overflowBlocks = new ArrayList(); + ArrayList racks = new ArrayList(); + + // Process all racks over and over again until there is no more work to do. + while (blockToNodes.size() > 0) { + + // Create one split for this rack before moving over to the next rack. + // Come back to this rack after creating a single split for each of the + // remaining racks. + // Process one rack location at a time, Combine all possible blocks that + // reside on this rack as one split. (constrained by minimum and maximum + // split size). + + // iterate over all racks + for (Iterator>> iter = rackToBlocks + .entrySet().iterator(); iter.hasNext();) { + + Map.Entry> one = iter.next(); + racks.add(one.getKey()); + List blocks = one.getValue(); + + // for each block, copy it into validBlocks. Delete it from + // blockToNodes so that the same block does not appear in + // two different splits. + boolean createdSplit = false; + for (OneBlockInfo oneblock : blocks) { + if (blockToNodes.containsKey(oneblock)) { + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(job, splits, getHosts(racks), validBlocks); + createdSplit = true; + break; + } + } + } + + // if we created a split, then just go to the next rack + if (createdSplit) { + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + continue; + } + + if (!validBlocks.isEmpty()) { + if (minSizeRack != 0 && curSplitSize >= minSizeRack) { + // if there is a mimimum size specified, then create a single split + // otherwise, store these blocks into overflow data structure + addCreatedSplit(job, splits, getHosts(racks), validBlocks); + } else { + // There were a few blocks in this rack that remained to be + // processed. + // Keep them in 'overflow' block list. These will be combined later. + overflowBlocks.addAll(validBlocks); + } + } + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + assert blockToNodes.isEmpty(); + assert curSplitSize == 0; + assert validBlocks.isEmpty(); + assert racks.isEmpty(); + + // Process all overflow blocks + for (OneBlockInfo oneblock : overflowBlocks) { + validBlocks.add(oneblock); + curSplitSize += oneblock.length; + + // This might cause an exiting rack location to be re-added, + // but it should be ok. + for (int i = 0; i < oneblock.racks.length; i++) { + racks.add(oneblock.racks[i]); + } + + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(job, splits, getHosts(racks), validBlocks); + curSplitSize = 0; + validBlocks.clear(); + racks.clear(); + } + } + + // Process any remaining blocks, if any. + if (!validBlocks.isEmpty()) { + addCreatedSplit(job, splits, getHosts(racks), validBlocks); + } + } + + /** + * Create a single split from the list of blocks specified in validBlocks Add + * this new split into splitList. + */ + private void addCreatedSplit(BSPJob job, List splitList, + List locations, ArrayList validBlocks) { + // create an input split + Path[] fl = new Path[validBlocks.size()]; + long[] offset = new long[validBlocks.size()]; + long[] length = new long[validBlocks.size()]; + for (int i = 0; i < validBlocks.size(); i++) { + fl[i] = validBlocks.get(i).onepath; + offset[i] = validBlocks.get(i).offset; + length[i] = validBlocks.get(i).length; + } + + // add this split to the list that is returned + CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, length, + locations.toArray(new String[0])); + splitList.add(thissplit); + } + + /** + * information about one file from the File System + */ + private static class OneFileInfo { + private long fileSize; // size of the file + private OneBlockInfo[] blocks; // all blocks in this file + + OneFileInfo(Path path, BSPJob job, + HashMap> rackToBlocks, + HashMap blockToNodes, + HashMap> nodeToBlocks) throws IOException { + this.fileSize = 0; + + // get block locations from file system + FileSystem fs = path.getFileSystem(job.getConf()); + FileStatus stat = fs.getFileStatus(path); + BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, + stat.getLen()); + // create a list of all block and their locations + if (locations == null) { + blocks = new OneBlockInfo[0]; + } else { + blocks = new OneBlockInfo[locations.length]; + for (int i = 0; i < locations.length; i++) { + + fileSize += locations[i].getLength(); + OneBlockInfo oneblock = new OneBlockInfo(path, + locations[i].getOffset(), locations[i].getLength(), + locations[i].getHosts(), locations[i].getTopologyPaths()); + blocks[i] = oneblock; + + // add this block to the block --> node locations map + blockToNodes.put(oneblock, oneblock.hosts); + + // add this block to the rack --> block map + for (int j = 0; j < oneblock.racks.length; j++) { + String rack = oneblock.racks[j]; + List blklist = rackToBlocks.get(rack); + if (blklist == null) { + blklist = new ArrayList(); + rackToBlocks.put(rack, blklist); + } + blklist.add(oneblock); + // Add this host to rackToNodes map + addHostToRack(oneblock.racks[j], oneblock.hosts[j]); + } + + // add this block to the node --> block map + for (int j = 0; j < oneblock.hosts.length; j++) { + String node = oneblock.hosts[j]; + List blklist = nodeToBlocks.get(node); + if (blklist == null) { + blklist = new ArrayList(); + nodeToBlocks.put(node, blklist); + } + blklist.add(oneblock); + } + } + } + } + + long getLength() { + return fileSize; + } + } + + /** + * information about one block from the File System + */ + private static class OneBlockInfo { + Path onepath; // name of this file + long offset; // offset in file + long length; // length of this block + String[] hosts; // nodes on whch this block resides + String[] racks; // network topology of hosts + + OneBlockInfo(Path path, long offset, long len, String[] hosts, + String[] topologyPaths) { + this.onepath = path; + this.offset = offset; + this.hosts = hosts; + this.length = len; + assert (hosts.length == topologyPaths.length || topologyPaths.length == 0); + + // if the file ystem does not have any rack information, then + // use dummy rack location. + if (topologyPaths.length == 0) { + topologyPaths = new String[hosts.length]; + for (int i = 0; i < topologyPaths.length; i++) { + topologyPaths[i] = (new NodeBase(hosts[i], + NetworkTopology.DEFAULT_RACK)).toString(); + } + } + + // The topology paths have the host name included as the last + // component. Strip it. + this.racks = new String[topologyPaths.length]; + for (int i = 0; i < topologyPaths.length; i++) { + this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation(); + } + } + } + + private static void addHostToRack(String rack, String host) { + Set hosts = rackToNodes.get(rack); + if (hosts == null) { + hosts = new HashSet(); + rackToNodes.put(rack, hosts); + } + hosts.add(host); + } + + private static List getHosts(List racks) { + List hosts = new ArrayList(); + for (String rack : racks) { + hosts.addAll(rackToNodes.get(rack)); + } + return hosts; + } + + /** + * Accept a path only if any one of filters given in the constructor do. + */ + private static class MultiPathFilter implements PathFilter { + private List filters; + + public MultiPathFilter() { + this.filters = new ArrayList(); + } + + public MultiPathFilter(List filters) { + this.filters = filters; + } + + public void add(PathFilter one) { + filters.add(one); + } + + public boolean accept(Path path) { + for (PathFilter filter : filters) { + if (filter.accept(path)) { + return true; + } + } + return false; + } + + public String toString() { + StringBuffer buf = new StringBuffer(); + buf.append("["); + for (PathFilter f : filters) { + buf.append(f); + buf.append(","); + } + buf.append("]"); + return buf.toString(); + } + } +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.bsp; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.lib.CombineFileInputFormat; + +/** + * A sub-collection of input files. Unlike + * {@link org.apache.hadoop.mapred.FileSplit}, CombineFileSplit class does not + * represent a split of a file, but a split of input files into smaller sets. A + * split may contain blocks from different file but all the blocks in the same + * split are probably local to some rack
+ * CombineFileSplit can be used to implement + * {@link org.apache.hadoop.mapred.RecordReader}'s, with reading one record per + * file. + * + * @see org.apache.hadoop.mapred.FileSplit + * @see CombineFileInputFormat + */ +public class CombineFileSplit implements InputSplit { + + private Path[] paths; + private long[] startoffset; + private long[] lengths; + private String[] locations; + private long totLength; + private BSPJob job; + + /** + * default constructor + */ + public CombineFileSplit() { + } + + public CombineFileSplit(BSPJob job, Path[] files, long[] start, + long[] lengths, String[] locations) { + initSplit(job, files, start, lengths, locations); + } + + public CombineFileSplit(BSPJob job, Path[] files, long[] lengths) { + long[] startoffset = new long[files.length]; + for (int i = 0; i < startoffset.length; i++) { + startoffset[i] = 0; + } + String[] locations = new String[files.length]; + for (int i = 0; i < locations.length; i++) { + locations[i] = ""; + } + initSplit(job, files, startoffset, lengths, locations); + } + + private void initSplit(BSPJob job, Path[] files, long[] start, + long[] lengths, String[] locations) { + this.job = job; + this.startoffset = start; + this.lengths = lengths; + this.paths = files; + this.totLength = 0; + this.locations = locations; + for (long length : lengths) { + totLength += length; + } + } + + + public Configuration getJob() { + return job.getConf(); + } + + public long getLength() { + return totLength; + } + + /** Returns an array containing the startoffsets of the files in the split */ + public long[] getStartOffsets() { + return startoffset; + } + + /** Returns an array containing the lengths of the files in the split */ + public long[] getLengths() { + return lengths; + } + + /** Returns the start offset of the ith Path */ + public long getOffset(int i) { + return startoffset[i]; + } + + /** Returns the length of the ith Path */ + public long getLength(int i) { + return lengths[i]; + } + + /** Returns the number of Paths in the split */ + public int getNumPaths() { + return paths.length; + } + + /** Returns the ith Path */ + public Path getPath(int i) { + return paths[i]; + } + + /** Returns all the Paths in the split */ + public Path[] getPaths() { + return paths; + } + + /** Returns all the Paths where this input-split resides */ + public String[] getLocations() throws IOException { + return locations; + } + + public void readFields(DataInput in) throws IOException { + totLength = in.readLong(); + int arrLength = in.readInt(); + lengths = new long[arrLength]; + for (int i = 0; i < arrLength; i++) { + lengths[i] = in.readLong(); + } + int filesLength = in.readInt(); + paths = new Path[filesLength]; + for (int i = 0; i < filesLength; i++) { + paths[i] = new Path(Text.readString(in)); + } + arrLength = in.readInt(); + startoffset = new long[arrLength]; + for (int i = 0; i < arrLength; i++) { + startoffset[i] = in.readLong(); + } + } + + public void write(DataOutput out) throws IOException { + out.writeLong(totLength); + out.writeInt(lengths.length); + for (long length : lengths) { + out.writeLong(length); + } + out.writeInt(paths.length); + for (Path p : paths) { + Text.writeString(out, p.toString()); + } + out.writeInt(startoffset.length); + for (long length : startoffset) { + out.writeLong(length); + } + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < paths.length; i++) { + if (i == 0) { + sb.append("Paths:"); + } + sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] + "+" + + lengths[i]); + if (i < paths.length - 1) { + sb.append(","); + } + } + if (locations != null) { + String locs = ""; + StringBuffer locsb = new StringBuffer(); + for (int i = 0; i < locations.length; i++) { + locsb.append(locations[i] + ":"); + } + locs = locsb.toString(); + sb.append(" Locations:" + locs + "; "); + } + return sb.toString(); + } +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +/** + * HashPartitioner is partitioning by the hashcode of the key. + * + */ +public class HashPartitioner implements Partitioner { + + @Override + public int getPartition(K key, V value, int numTasks) { + return key.hashCode() % numTasks; + } + +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/HashPartitioner.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +/** + * Partitioning interface which is used to spread key value pairs to a specific + * partition. + * + * @param + * @param + */ +public interface Partitioner { + + /** + * Partitions a specific key value mapping to a bucket. + * + * @param key + * @param value + * @param numTasks + * @return a number between 0 and numTasks (exclusive) that tells which + * partition it belongs to. + */ + public int getPartition(K key, V value, int numTasks); + +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Partitioner.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.hadoop.io.SequenceFile; + +public class SequenceFileInputFormat extends FileInputFormat { + + public SequenceFileInputFormat() { + setMinSplitSize(SequenceFile.SYNC_INTERVAL); + } + + @Override + public RecordReader getRecordReader(InputSplit split, BSPJob job) + throws IOException { + return new SequenceFileRecordReader(job.getConf(),(FileSplit) split); + } +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java (added) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.ReflectionUtils; + +public class SequenceFileRecordReader implements RecordReader { + + private SequenceFile.Reader in; + private long start; + private long end; + private boolean more = true; + protected Configuration conf; + + public SequenceFileRecordReader(Configuration conf, FileSplit split) + throws IOException { + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + this.in = new SequenceFile.Reader(fs, path, conf); + this.end = split.getStart() + split.getLength(); + this.conf = conf; + + if (split.getStart() > in.getPosition()) { + in.sync(split.getStart()); // sync to start + } + + this.start = in.getPosition(); + more = start < end; + } + + /** + * The class of key that must be passed to {@link #next(Object, Object)}.. + */ + @SuppressWarnings("rawtypes") + public Class getKeyClass() { + return in.getKeyClass(); + } + + /** + * The class of value that must be passed to {@link #next(Object, Object)}.. + */ + @SuppressWarnings("rawtypes") + public Class getValueClass() { + return in.getValueClass(); + } + + @SuppressWarnings("unchecked") + public K createKey() { + return (K) ReflectionUtils.newInstance(getKeyClass(), conf); + } + + @SuppressWarnings("unchecked") + public V createValue() { + return (V) ReflectionUtils.newInstance(getValueClass(), conf); + } + + public synchronized boolean next(K key, V value) throws IOException { + if (!more) + return false; + long pos = in.getPosition(); + boolean remaining = (in.next(key) != null); + if (remaining) { + getCurrentValue(value); + } + if (pos >= end && in.syncSeen()) { + more = false; + } else { + more = remaining; + } + return more; + } + + protected synchronized boolean next(K key) throws IOException { + if (!more) + return false; + long pos = in.getPosition(); + boolean remaining = (in.next(key) != null); + if (pos >= end && in.syncSeen()) { + more = false; + } else { + more = remaining; + } + return more; + } + + protected synchronized void getCurrentValue(V value) throws IOException { + in.getCurrentValue(value); + } + + /** + * Return the progress within the input split + * + * @return 0.0 to 1.0 of the input byte range + */ + public float getProgress() throws IOException { + if (end == start) { + return 0.0f; + } else { + return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start)); + } + } + + public synchronized long getPos() throws IOException { + return in.getPosition(); + } + + protected synchronized void seek(long pos) throws IOException { + in.seek(pos); + } + + public synchronized void close() throws IOException { + in.close(); + } + +} Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordReader.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1199686&view=auto ============================================================================== --- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (added) +++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Wed Nov 9 10:28:06 2011 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.bsp; + +import java.io.IOException; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; +import org.apache.hama.util.KeyValuePair; +import org.apache.zookeeper.KeeperException; + +public class TestPartitioning extends TestCase { + + public static final Log LOG = LogFactory.getLog(TestPartitioning.class); + + public void testPartitioner() throws Exception { + + Configuration conf = new Configuration(); + conf.set("bsp.local.dir", "/tmp/hama-test/partitioning"); + conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest"); + BSPJob bsp = new BSPJob(new HamaConfiguration(conf)); + bsp.setJobName("Test partitioning with input"); + bsp.setBspClass(PartionedBSP.class); + bsp.setNumBspTask(2); + conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600); + bsp.setInputFormat(TextInputFormat.class); + bsp.setOutputFormat(NullOutputFormat.class); + bsp.setInputPath(new Path("../CHANGES.txt")); + bsp.setPartitioner(HashPartitioner.class); + assertTrue(bsp.waitForCompletion(true)); + } + + public static class PartionedBSP extends + BSP { + + @Override + public void bsp(BSPPeer peer) + throws IOException, KeeperException, InterruptedException { + long numOfPairs = 0; + KeyValuePair readNext = null; + while ((readNext = peer.readNext()) != null) { + LOG.debug(readNext.getKey().get() + " / " + readNext.getValue().toString()); + numOfPairs++; + } + + assertTrue(numOfPairs > 2); + } + + } + +} Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java ------------------------------------------------------------------------------ svn:eol-style = native