incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [2/33] git commit: CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:45 GMT
CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/83acb813
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/83acb813
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/83acb813

Branch: refs/heads/master
Commit: 83acb813d5293be67ff693bc3e63a19abedffea5
Parents: 91b4ac2
Author: jwills <jwills@apache.org>
Authored: Tue Jul 10 21:06:00 2012 -0700
Committer: jwills <jwills@apache.org>
Committed: Tue Jul 10 21:06:00 2012 -0700

----------------------------------------------------------------------
 crunch/pom.xml                                     |  363 +
 .../src/main/java/org/apache/crunch/CombineFn.java |  807 +
 crunch/src/main/java/org/apache/crunch/DoFn.java   |  184 +
 .../src/main/java/org/apache/crunch/Emitter.java   |   37 +
 .../src/main/java/org/apache/crunch/FilterFn.java  |  134 +
 .../java/org/apache/crunch/GroupingOptions.java    |  130 +
 crunch/src/main/java/org/apache/crunch/MapFn.java  |   41 +
 .../main/java/org/apache/crunch/PCollection.java   |  183 +
 .../main/java/org/apache/crunch/PGroupedTable.java |   39 +
 crunch/src/main/java/org/apache/crunch/PTable.java |  133 +
 crunch/src/main/java/org/apache/crunch/Pair.java   |  105 +
 .../src/main/java/org/apache/crunch/Pipeline.java  |  106 +
 .../java/org/apache/crunch/PipelineResult.java     |   75 +
 crunch/src/main/java/org/apache/crunch/Source.java |   51 +
 .../main/java/org/apache/crunch/SourceTarget.java  |   27 +
 .../main/java/org/apache/crunch/TableSource.java   |   28 +
 crunch/src/main/java/org/apache/crunch/Target.java |   31 +
 crunch/src/main/java/org/apache/crunch/Tuple.java  |   36 +
 crunch/src/main/java/org/apache/crunch/Tuple3.java |   96 +
 crunch/src/main/java/org/apache/crunch/Tuple4.java |  106 +
 crunch/src/main/java/org/apache/crunch/TupleN.java |   69 +
 .../java/org/apache/crunch/fn/CompositeMapFn.java  |   71 +
 .../java/org/apache/crunch/fn/ExtractKeyFn.java    |   44 +
 .../main/java/org/apache/crunch/fn/IdentityFn.java |   39 +
 .../main/java/org/apache/crunch/fn/MapKeysFn.java  |   32 +
 .../java/org/apache/crunch/fn/MapValuesFn.java     |   32 +
 .../main/java/org/apache/crunch/fn/PairMapFn.java  |   64 +
 .../org/apache/crunch/impl/mem/MemPipeline.java    |  209 +
 .../crunch/impl/mem/collect/MemCollection.java     |  205 +
 .../crunch/impl/mem/collect/MemGroupedTable.java   |  126 +
 .../apache/crunch/impl/mem/collect/MemTable.java   |  146 +
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |  322 +
 .../crunch/impl/mr/collect/DoCollectionImpl.java   |   65 +
 .../apache/crunch/impl/mr/collect/DoTableImpl.java |   79 +
 .../crunch/impl/mr/collect/InputCollection.java    |   85 +
 .../apache/crunch/impl/mr/collect/InputTable.java  |   81 +
 .../crunch/impl/mr/collect/PCollectionImpl.java    |  241 +
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |  120 +
 .../apache/crunch/impl/mr/collect/PTableBase.java  |  123 +
 .../crunch/impl/mr/collect/UnionCollection.java    |   81 +
 .../apache/crunch/impl/mr/collect/UnionTable.java  |   94 +
 .../crunch/impl/mr/emit/IntermediateEmitter.java   |   49 +
 .../crunch/impl/mr/emit/MultipleOutputEmitter.java |   58 +
 .../apache/crunch/impl/mr/emit/OutputEmitter.java  |   54 +
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |  120 +
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |   76 +
 .../org/apache/crunch/impl/mr/plan/DoNode.java     |  164 +
 .../apache/crunch/impl/mr/plan/JobNameBuilder.java |   80 +
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |  224 +
 .../crunch/impl/mr/plan/MSCROutputHandler.java     |   75 +
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |  376 +
 .../org/apache/crunch/impl/mr/plan/NodePath.java   |  105 +
 .../crunch/impl/mr/plan/PlanningParameters.java    |   27 +
 .../apache/crunch/impl/mr/run/CrunchCombiner.java  |   27 +
 .../crunch/impl/mr/run/CrunchInputFormat.java      |   77 +
 .../crunch/impl/mr/run/CrunchInputSplit.java       |  131 +
 .../apache/crunch/impl/mr/run/CrunchInputs.java    |   71 +
 .../apache/crunch/impl/mr/run/CrunchMapper.java    |   73 +
 .../crunch/impl/mr/run/CrunchRecordReader.java     |   76 +
 .../apache/crunch/impl/mr/run/CrunchReducer.java   |   71 +
 .../crunch/impl/mr/run/CrunchRuntimeException.java |   43 +
 .../crunch/impl/mr/run/CrunchTaskContext.java      |   88 +
 .../org/apache/crunch/impl/mr/run/NodeContext.java |   33 +
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |  122 +
 .../crunch/impl/mr/run/RuntimeParameters.java      |   34 +
 .../impl/mr/run/TaskAttemptContextFactory.java     |   68 +
 crunch/src/main/java/org/apache/crunch/io/At.java  |   89 +
 .../apache/crunch/io/CompositePathIterable.java    |  102 +
 .../org/apache/crunch/io/FileReaderFactory.java    |   27 +
 .../src/main/java/org/apache/crunch/io/From.java   |  107 +
 .../java/org/apache/crunch/io/MapReduceTarget.java |   28 +
 .../java/org/apache/crunch/io/OutputHandler.java   |   25 +
 .../main/java/org/apache/crunch/io/PathTarget.java |   24 +
 .../java/org/apache/crunch/io/PathTargetImpl.java  |   70 +
 .../java/org/apache/crunch/io/ReadableSource.java  |   28 +
 .../org/apache/crunch/io/ReadableSourceTarget.java |   29 +
 .../org/apache/crunch/io/SourceTargetHelper.java   |   68 +
 crunch/src/main/java/org/apache/crunch/io/To.java  |   72 +
 .../crunch/io/avro/AvroFileReaderFactory.java      |   90 +
 .../org/apache/crunch/io/avro/AvroFileSource.java  |   55 +
 .../crunch/io/avro/AvroFileSourceTarget.java       |   34 +
 .../org/apache/crunch/io/avro/AvroFileTarget.java  |   86 +
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |  109 +
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   94 +
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |  104 +
 .../apache/crunch/io/impl/FileTableSourceImpl.java |   39 +
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |  104 +
 .../org/apache/crunch/io/impl/InputBundle.java     |  114 +
 .../io/impl/ReadableSourcePathTargetImpl.java      |   40 +
 .../crunch/io/impl/ReadableSourceTargetImpl.java   |   39 +
 .../crunch/io/impl/SourcePathTargetImpl.java       |   44 +
 .../apache/crunch/io/impl/SourceTargetImpl.java    |   85 +
 .../crunch/io/impl/TableSourcePathTargetImpl.java  |   36 +
 .../crunch/io/impl/TableSourceTargetImpl.java      |   36 +
 .../org/apache/crunch/io/seq/SeqFileHelper.java    |   37 +
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |   94 +
 .../org/apache/crunch/io/seq/SeqFileSource.java    |   48 +
 .../apache/crunch/io/seq/SeqFileSourceTarget.java  |   38 +
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |  100 +
 .../apache/crunch/io/seq/SeqFileTableSource.java   |   58 +
 .../crunch/io/seq/SeqFileTableSourceTarget.java    |   48 +
 .../org/apache/crunch/io/seq/SeqFileTarget.java    |   50 +
 .../crunch/io/text/BZip2TextInputFormat.java       |  242 +
 .../apache/crunch/io/text/CBZip2InputStream.java   | 1025 +
 .../crunch/io/text/TextFileReaderFactory.java      |   96 +
 .../org/apache/crunch/io/text/TextFileSource.java  |   76 +
 .../crunch/io/text/TextFileSourceTarget.java       |   38 +
 .../org/apache/crunch/io/text/TextFileTarget.java  |   54 +
 .../main/java/org/apache/crunch/lib/Aggregate.java |  241 +
 .../main/java/org/apache/crunch/lib/Cartesian.java |  225 +
 .../main/java/org/apache/crunch/lib/Cogroup.java   |   91 +
 .../src/main/java/org/apache/crunch/lib/Join.java  |  151 +
 .../main/java/org/apache/crunch/lib/PTables.java   |   95 +
 .../main/java/org/apache/crunch/lib/Sample.java    |   62 +
 .../src/main/java/org/apache/crunch/lib/Set.java   |  125 +
 .../src/main/java/org/apache/crunch/lib/Sort.java  |  547 +
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |   99 +
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |   75 +
 .../java/org/apache/crunch/lib/join/JoinFn.java    |   73 +
 .../java/org/apache/crunch/lib/join/JoinUtils.java |  127 +
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |   95 +
 .../org/apache/crunch/lib/join/MapsideJoin.java    |  160 +
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |   80 +
 .../crunch/materialize/MaterializableIterable.java |   65 +
 .../crunch/materialize/MaterializableMap.java      |   50 +
 .../java/org/apache/crunch/test/FileHelper.java    |   42 +
 .../org/apache/crunch/test/InMemoryEmitter.java    |   55 +
 .../java/org/apache/crunch/test/TestCounters.java  |   41 +
 .../java/org/apache/crunch/tool/CrunchTool.java    |  104 +
 .../java/org/apache/crunch/types/Converter.java    |   41 +
 .../org/apache/crunch/types/PGroupedTableType.java |  124 +
 .../java/org/apache/crunch/types/PTableType.java   |   44 +
 .../main/java/org/apache/crunch/types/PType.java   |   82 +
 .../java/org/apache/crunch/types/PTypeFamily.java  |   79 +
 .../java/org/apache/crunch/types/PTypeUtils.java   |   66 +
 .../java/org/apache/crunch/types/TupleFactory.java |   98 +
 .../apache/crunch/types/avro/AvroDeepCopier.java   |  151 +
 .../crunch/types/avro/AvroGroupedTableType.java    |  110 +
 .../apache/crunch/types/avro/AvroInputFormat.java  |   58 +
 .../apache/crunch/types/avro/AvroKeyConverter.java |   67 +
 .../apache/crunch/types/avro/AvroOutputFormat.java |   71 +
 .../crunch/types/avro/AvroPairConverter.java       |  109 +
 .../apache/crunch/types/avro/AvroRecordReader.java |  115 +
 .../apache/crunch/types/avro/AvroTableType.java    |  167 +
 .../org/apache/crunch/types/avro/AvroType.java     |  172 +
 .../apache/crunch/types/avro/AvroTypeFamily.java   |  169 +
 .../crunch/types/avro/AvroUtf8InputFormat.java     |  102 +
 .../java/org/apache/crunch/types/avro/Avros.java   |  636 +
 .../crunch/types/avro/ReflectDataFactory.java      |   39 +
 .../crunch/types/avro/SafeAvroSerialization.java   |  151 +
 .../types/writable/GenericArrayWritable.java       |  128 +
 .../crunch/types/writable/TextMapWritable.java     |   88 +
 .../crunch/types/writable/TupleWritable.java       |  225 +
 .../types/writable/WritableGroupedTableType.java   |   81 +
 .../types/writable/WritablePairConverter.java      |   62 +
 .../crunch/types/writable/WritableTableType.java   |  126 +
 .../apache/crunch/types/writable/WritableType.java |  122 +
 .../crunch/types/writable/WritableTypeFamily.java  |  151 +
 .../types/writable/WritableValueConverter.java     |   61 +
 .../apache/crunch/types/writable/Writables.java    |  634 +
 .../main/java/org/apache/crunch/util/Collects.java |   48 +
 .../java/org/apache/crunch/util/DistCache.java     |  198 +
 .../main/java/org/apache/crunch/util/PTypes.java   |  259 +
 .../main/java/org/apache/crunch/util/Protos.java   |  150 +
 .../main/java/org/apache/crunch/util/Tuples.java   |  151 +
 .../lib/jobcontrol/CrunchControlledJob.java        |  338 +
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |  297 +
 .../lib/output/CrunchMultipleOutputs.java          |  473 +
 crunch/src/main/resources/log4j.properties         |    8 +
 .../java/org/apache/crunch/CollectionsTest.java    |  112 +
 .../test/java/org/apache/crunch/CombineFnTest.java |  206 +
 .../test/java/org/apache/crunch/FilterFnTest.java  |   63 +
 .../src/test/java/org/apache/crunch/MapsTest.java  |   93 +
 .../java/org/apache/crunch/MaterializeTest.java    |  107 +
 .../org/apache/crunch/MaterializeToMapTest.java    |   78 +
 .../java/org/apache/crunch/MultipleOutputTest.java |  110 +
 .../org/apache/crunch/PCollectionGetSizeTest.java  |  154 +
 .../java/org/apache/crunch/PTableKeyValueTest.java |  107 +
 .../test/java/org/apache/crunch/PageRankTest.java  |  166 +
 .../src/test/java/org/apache/crunch/PairTest.java  |   66 +
 .../src/test/java/org/apache/crunch/TFIDFTest.java |  233 +
 .../java/org/apache/crunch/TermFrequencyTest.java  |  134 +
 .../test/java/org/apache/crunch/TextPairTest.java  |   69 +
 .../org/apache/crunch/TupleNClassCastBugTest.java  |   95 +
 .../src/test/java/org/apache/crunch/TupleTest.java |  142 +
 .../java/org/apache/crunch/WordCountHBaseTest.java |  207 +
 .../test/java/org/apache/crunch/WordCountTest.java |  172 +
 .../org/apache/crunch/fn/ExtractKeyFnTest.java     |   46 +
 .../java/org/apache/crunch/fn/MapKeysTest.java     |   53 +
 .../java/org/apache/crunch/fn/MapValuesTest.java   |   51 +
 .../java/org/apache/crunch/fn/PairMapTest.java     |   53 +
 .../org/apache/crunch/fn/StoreLastEmitter.java     |   41 +
 .../impl/mem/MemPipelineFileWritingTest.java       |   53 +
 .../org/apache/crunch/impl/mr/MRPipelineTest.java  |   77 +
 .../impl/mr/collect/DoCollectionImplTest.java      |  118 +
 .../crunch/impl/mr/collect/DoTableImplTest.java    |   88 +
 .../impl/mr/collect/UnionCollectionTest.java       |  161 +
 .../crunch/impl/mr/plan/JobNameBuilderTest.java    |   41 +
 .../crunch/io/CompositePathIterableTest.java       |   81 +
 .../apache/crunch/io/SourceTargetHelperTest.java   |   59 +
 .../crunch/io/avro/AvroFileReaderFactoryTest.java  |  158 +
 .../crunch/io/avro/AvroFileSourceTargetTest.java   |  153 +
 .../apache/crunch/io/avro/AvroFileSourceTest.java  |   82 +
 .../org/apache/crunch/io/avro/AvroReflectTest.java |  115 +
 .../java/org/apache/crunch/lib/AggregateTest.java  |  232 +
 .../lib/AvroIndexedRecordPartitionerTest.java      |   99 +
 .../org/apache/crunch/lib/AvroTypeSortTest.java    |  148 +
 .../java/org/apache/crunch/lib/CartesianTest.java  |   65 +
 .../java/org/apache/crunch/lib/CogroupTest.java    |  126 +
 .../java/org/apache/crunch/lib/SampleTest.java     |   37 +
 .../test/java/org/apache/crunch/lib/SetTest.java   |  115 +
 .../test/java/org/apache/crunch/lib/SortTest.java  |  282 +
 .../apache/crunch/lib/SpecificAvroGroupByTest.java |  139 +
 .../crunch/lib/TupleWritablePartitionerTest.java   |   70 +
 .../apache/crunch/lib/join/FullOuterJoinTest.java  |   51 +
 .../org/apache/crunch/lib/join/InnerJoinTest.java  |   51 +
 .../org/apache/crunch/lib/join/JoinTester.java     |  107 +
 .../apache/crunch/lib/join/LeftOuterJoinTest.java  |   51 +
 .../apache/crunch/lib/join/MapsideJoinTest.java    |  119 +
 .../crunch/lib/join/MultiAvroSchemaJoinTest.java   |  116 +
 .../apache/crunch/lib/join/RightOuterJoinTest.java |   51 +
 .../java/org/apache/crunch/test/CountersTest.java  |   66 +
 .../test/java/org/apache/crunch/test/Employee.java |  230 +
 .../test/java/org/apache/crunch/test/Person.java   |  230 +
 .../org/apache/crunch/types/PTypeUtilsTest.java    |   91 +
 .../crunch/types/avro/AvroDeepCopierTest.java      |   75 +
 .../types/avro/AvroGroupedTableTypeTest.java       |   58 +
 .../crunch/types/avro/AvroTableTypeTest.java       |   52 +
 .../org/apache/crunch/types/avro/AvroTypeTest.java |  135 +
 .../org/apache/crunch/types/avro/AvrosTest.java    |  224 +
 .../writable/WritableGroupedTableTypeTest.java     |   54 +
 .../types/writable/WritableTableTypeTest.java      |   47 +
 .../crunch/types/writable/WritableTypeTest.java    |   46 +
 .../crunch/types/writable/WritablesTest.java       |  279 +
 .../java/org/apache/crunch/util/DistCacheTest.java |  144 +
 crunch/src/test/resources/customers.txt            |    4 +
 crunch/src/test/resources/docs.txt                 |    6 +
 crunch/src/test/resources/employee.avro            |    9 +
 crunch/src/test/resources/letters.txt              |    2 +
 crunch/src/test/resources/log4j.properties         |   11 +
 crunch/src/test/resources/maugham.txt              |29112 +++++++++++++++
 crunch/src/test/resources/orders.txt               |    4 +
 crunch/src/test/resources/person.avro              |    9 +
 crunch/src/test/resources/set1.txt                 |    4 +
 crunch/src/test/resources/set2.txt                 |    3 +
 crunch/src/test/resources/shakes.txt               | 3667 ++
 crunch/src/test/resources/urls.txt                 |   11 +
 examples/pom.xml                                   |   12 +-
 pom.xml                                            |   14 +-
 scrunch/pom.xml                                    |   11 +-
 src/main/java/org/apache/crunch/CombineFn.java     |  807 -
 src/main/java/org/apache/crunch/DoFn.java          |  184 -
 src/main/java/org/apache/crunch/Emitter.java       |   37 -
 src/main/java/org/apache/crunch/FilterFn.java      |  134 -
 .../java/org/apache/crunch/GroupingOptions.java    |  130 -
 src/main/java/org/apache/crunch/MapFn.java         |   41 -
 src/main/java/org/apache/crunch/PCollection.java   |  183 -
 src/main/java/org/apache/crunch/PGroupedTable.java |   39 -
 src/main/java/org/apache/crunch/PTable.java        |  133 -
 src/main/java/org/apache/crunch/Pair.java          |  105 -
 src/main/java/org/apache/crunch/Pipeline.java      |  106 -
 .../java/org/apache/crunch/PipelineResult.java     |   75 -
 src/main/java/org/apache/crunch/Source.java        |   51 -
 src/main/java/org/apache/crunch/SourceTarget.java  |   27 -
 src/main/java/org/apache/crunch/TableSource.java   |   28 -
 src/main/java/org/apache/crunch/Target.java        |   31 -
 src/main/java/org/apache/crunch/Tuple.java         |   36 -
 src/main/java/org/apache/crunch/Tuple3.java        |   96 -
 src/main/java/org/apache/crunch/Tuple4.java        |  106 -
 src/main/java/org/apache/crunch/TupleN.java        |   69 -
 .../java/org/apache/crunch/fn/CompositeMapFn.java  |   71 -
 .../java/org/apache/crunch/fn/ExtractKeyFn.java    |   44 -
 src/main/java/org/apache/crunch/fn/IdentityFn.java |   39 -
 src/main/java/org/apache/crunch/fn/MapKeysFn.java  |   32 -
 .../java/org/apache/crunch/fn/MapValuesFn.java     |   32 -
 src/main/java/org/apache/crunch/fn/PairMapFn.java  |   64 -
 .../org/apache/crunch/impl/mem/MemPipeline.java    |  209 -
 .../crunch/impl/mem/collect/MemCollection.java     |  205 -
 .../crunch/impl/mem/collect/MemGroupedTable.java   |  126 -
 .../apache/crunch/impl/mem/collect/MemTable.java   |  146 -
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |  322 -
 .../crunch/impl/mr/collect/DoCollectionImpl.java   |   65 -
 .../apache/crunch/impl/mr/collect/DoTableImpl.java |   79 -
 .../crunch/impl/mr/collect/InputCollection.java    |   85 -
 .../apache/crunch/impl/mr/collect/InputTable.java  |   81 -
 .../crunch/impl/mr/collect/PCollectionImpl.java    |  241 -
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |  120 -
 .../apache/crunch/impl/mr/collect/PTableBase.java  |  123 -
 .../crunch/impl/mr/collect/UnionCollection.java    |   81 -
 .../apache/crunch/impl/mr/collect/UnionTable.java  |   94 -
 .../crunch/impl/mr/emit/IntermediateEmitter.java   |   49 -
 .../crunch/impl/mr/emit/MultipleOutputEmitter.java |   58 -
 .../apache/crunch/impl/mr/emit/OutputEmitter.java  |   54 -
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |  120 -
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |   76 -
 .../org/apache/crunch/impl/mr/plan/DoNode.java     |  164 -
 .../apache/crunch/impl/mr/plan/JobNameBuilder.java |   80 -
 .../apache/crunch/impl/mr/plan/JobPrototype.java   |  224 -
 .../crunch/impl/mr/plan/MSCROutputHandler.java     |   75 -
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |  376 -
 .../org/apache/crunch/impl/mr/plan/NodePath.java   |  105 -
 .../crunch/impl/mr/plan/PlanningParameters.java    |   27 -
 .../apache/crunch/impl/mr/run/CrunchCombiner.java  |   27 -
 .../crunch/impl/mr/run/CrunchInputFormat.java      |   77 -
 .../crunch/impl/mr/run/CrunchInputSplit.java       |  131 -
 .../apache/crunch/impl/mr/run/CrunchInputs.java    |   71 -
 .../apache/crunch/impl/mr/run/CrunchMapper.java    |   73 -
 .../crunch/impl/mr/run/CrunchRecordReader.java     |   76 -
 .../apache/crunch/impl/mr/run/CrunchReducer.java   |   71 -
 .../crunch/impl/mr/run/CrunchRuntimeException.java |   43 -
 .../crunch/impl/mr/run/CrunchTaskContext.java      |   88 -
 .../org/apache/crunch/impl/mr/run/NodeContext.java |   33 -
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |  122 -
 .../crunch/impl/mr/run/RuntimeParameters.java      |   34 -
 .../impl/mr/run/TaskAttemptContextFactory.java     |   68 -
 src/main/java/org/apache/crunch/io/At.java         |   89 -
 .../apache/crunch/io/CompositePathIterable.java    |  102 -
 .../org/apache/crunch/io/FileReaderFactory.java    |   27 -
 src/main/java/org/apache/crunch/io/From.java       |  107 -
 .../java/org/apache/crunch/io/MapReduceTarget.java |   28 -
 .../java/org/apache/crunch/io/OutputHandler.java   |   25 -
 src/main/java/org/apache/crunch/io/PathTarget.java |   24 -
 .../java/org/apache/crunch/io/PathTargetImpl.java  |   70 -
 .../java/org/apache/crunch/io/ReadableSource.java  |   28 -
 .../org/apache/crunch/io/ReadableSourceTarget.java |   29 -
 .../org/apache/crunch/io/SourceTargetHelper.java   |   68 -
 src/main/java/org/apache/crunch/io/To.java         |   72 -
 .../crunch/io/avro/AvroFileReaderFactory.java      |   90 -
 .../org/apache/crunch/io/avro/AvroFileSource.java  |   55 -
 .../crunch/io/avro/AvroFileSourceTarget.java       |   34 -
 .../org/apache/crunch/io/avro/AvroFileTarget.java  |   86 -
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |  109 -
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   94 -
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |  104 -
 .../apache/crunch/io/impl/FileTableSourceImpl.java |   39 -
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |  104 -
 .../org/apache/crunch/io/impl/InputBundle.java     |  114 -
 .../io/impl/ReadableSourcePathTargetImpl.java      |   40 -
 .../crunch/io/impl/ReadableSourceTargetImpl.java   |   39 -
 .../crunch/io/impl/SourcePathTargetImpl.java       |   44 -
 .../apache/crunch/io/impl/SourceTargetImpl.java    |   85 -
 .../crunch/io/impl/TableSourcePathTargetImpl.java  |   36 -
 .../crunch/io/impl/TableSourceTargetImpl.java      |   36 -
 .../org/apache/crunch/io/seq/SeqFileHelper.java    |   37 -
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |   94 -
 .../org/apache/crunch/io/seq/SeqFileSource.java    |   48 -
 .../apache/crunch/io/seq/SeqFileSourceTarget.java  |   38 -
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |  100 -
 .../apache/crunch/io/seq/SeqFileTableSource.java   |   58 -
 .../crunch/io/seq/SeqFileTableSourceTarget.java    |   48 -
 .../org/apache/crunch/io/seq/SeqFileTarget.java    |   50 -
 .../crunch/io/text/BZip2TextInputFormat.java       |  242 -
 .../apache/crunch/io/text/CBZip2InputStream.java   | 1025 -
 .../crunch/io/text/TextFileReaderFactory.java      |   96 -
 .../org/apache/crunch/io/text/TextFileSource.java  |   76 -
 .../crunch/io/text/TextFileSourceTarget.java       |   38 -
 .../org/apache/crunch/io/text/TextFileTarget.java  |   54 -
 src/main/java/org/apache/crunch/lib/Aggregate.java |  241 -
 src/main/java/org/apache/crunch/lib/Cartesian.java |  225 -
 src/main/java/org/apache/crunch/lib/Cogroup.java   |   91 -
 src/main/java/org/apache/crunch/lib/Join.java      |  151 -
 src/main/java/org/apache/crunch/lib/PTables.java   |   95 -
 src/main/java/org/apache/crunch/lib/Sample.java    |   62 -
 src/main/java/org/apache/crunch/lib/Set.java       |  125 -
 src/main/java/org/apache/crunch/lib/Sort.java      |  547 -
 .../apache/crunch/lib/join/FullOuterJoinFn.java    |   99 -
 .../org/apache/crunch/lib/join/InnerJoinFn.java    |   75 -
 .../java/org/apache/crunch/lib/join/JoinFn.java    |   73 -
 .../java/org/apache/crunch/lib/join/JoinUtils.java |  127 -
 .../apache/crunch/lib/join/LeftOuterJoinFn.java    |   95 -
 .../org/apache/crunch/lib/join/MapsideJoin.java    |  160 -
 .../apache/crunch/lib/join/RightOuterJoinFn.java   |   80 -
 .../crunch/materialize/MaterializableIterable.java |   65 -
 .../crunch/materialize/MaterializableMap.java      |   50 -
 .../java/org/apache/crunch/test/FileHelper.java    |   42 -
 .../org/apache/crunch/test/InMemoryEmitter.java    |   55 -
 .../java/org/apache/crunch/test/TestCounters.java  |   41 -
 .../java/org/apache/crunch/tool/CrunchTool.java    |  104 -
 .../java/org/apache/crunch/types/Converter.java    |   41 -
 .../org/apache/crunch/types/PGroupedTableType.java |  124 -
 .../java/org/apache/crunch/types/PTableType.java   |   44 -
 src/main/java/org/apache/crunch/types/PType.java   |   82 -
 .../java/org/apache/crunch/types/PTypeFamily.java  |   79 -
 .../java/org/apache/crunch/types/PTypeUtils.java   |   66 -
 .../java/org/apache/crunch/types/TupleFactory.java |   98 -
 .../apache/crunch/types/avro/AvroDeepCopier.java   |  151 -
 .../crunch/types/avro/AvroGroupedTableType.java    |  110 -
 .../apache/crunch/types/avro/AvroInputFormat.java  |   58 -
 .../apache/crunch/types/avro/AvroKeyConverter.java |   67 -
 .../apache/crunch/types/avro/AvroOutputFormat.java |   71 -
 .../crunch/types/avro/AvroPairConverter.java       |  109 -
 .../apache/crunch/types/avro/AvroRecordReader.java |  115 -
 .../apache/crunch/types/avro/AvroTableType.java    |  167 -
 .../org/apache/crunch/types/avro/AvroType.java     |  172 -
 .../apache/crunch/types/avro/AvroTypeFamily.java   |  169 -
 .../crunch/types/avro/AvroUtf8InputFormat.java     |  102 -
 .../java/org/apache/crunch/types/avro/Avros.java   |  636 -
 .../crunch/types/avro/ReflectDataFactory.java      |   39 -
 .../crunch/types/avro/SafeAvroSerialization.java   |  151 -
 .../types/writable/GenericArrayWritable.java       |  128 -
 .../crunch/types/writable/TextMapWritable.java     |   88 -
 .../crunch/types/writable/TupleWritable.java       |  225 -
 .../types/writable/WritableGroupedTableType.java   |   81 -
 .../types/writable/WritablePairConverter.java      |   62 -
 .../crunch/types/writable/WritableTableType.java   |  126 -
 .../apache/crunch/types/writable/WritableType.java |  122 -
 .../crunch/types/writable/WritableTypeFamily.java  |  151 -
 .../types/writable/WritableValueConverter.java     |   61 -
 .../apache/crunch/types/writable/Writables.java    |  634 -
 src/main/java/org/apache/crunch/util/Collects.java |   48 -
 .../java/org/apache/crunch/util/DistCache.java     |  198 -
 src/main/java/org/apache/crunch/util/PTypes.java   |  259 -
 src/main/java/org/apache/crunch/util/Protos.java   |  150 -
 src/main/java/org/apache/crunch/util/Tuples.java   |  151 -
 .../lib/jobcontrol/CrunchControlledJob.java        |  338 -
 .../mapreduce/lib/jobcontrol/CrunchJobControl.java |  297 -
 .../lib/output/CrunchMultipleOutputs.java          |  473 -
 src/main/resources/log4j.properties                |    8 -
 .../java/org/apache/crunch/CollectionsTest.java    |  112 -
 src/test/java/org/apache/crunch/CombineFnTest.java |  206 -
 src/test/java/org/apache/crunch/FilterFnTest.java  |   63 -
 src/test/java/org/apache/crunch/MapsTest.java      |   93 -
 .../java/org/apache/crunch/MaterializeTest.java    |  107 -
 .../org/apache/crunch/MaterializeToMapTest.java    |   78 -
 .../java/org/apache/crunch/MultipleOutputTest.java |  110 -
 .../org/apache/crunch/PCollectionGetSizeTest.java  |  154 -
 .../java/org/apache/crunch/PTableKeyValueTest.java |  107 -
 src/test/java/org/apache/crunch/PageRankTest.java  |  166 -
 src/test/java/org/apache/crunch/PairTest.java      |   66 -
 src/test/java/org/apache/crunch/TFIDFTest.java     |  233 -
 .../java/org/apache/crunch/TermFrequencyTest.java  |  134 -
 src/test/java/org/apache/crunch/TextPairTest.java  |   69 -
 .../org/apache/crunch/TupleNClassCastBugTest.java  |   95 -
 src/test/java/org/apache/crunch/TupleTest.java     |  142 -
 .../java/org/apache/crunch/WordCountHBaseTest.java |  207 -
 src/test/java/org/apache/crunch/WordCountTest.java |  172 -
 .../org/apache/crunch/fn/ExtractKeyFnTest.java     |   46 -
 .../java/org/apache/crunch/fn/MapKeysTest.java     |   53 -
 .../java/org/apache/crunch/fn/MapValuesTest.java   |   51 -
 .../java/org/apache/crunch/fn/PairMapTest.java     |   53 -
 .../org/apache/crunch/fn/StoreLastEmitter.java     |   41 -
 .../impl/mem/MemPipelineFileWritingTest.java       |   53 -
 .../org/apache/crunch/impl/mr/MRPipelineTest.java  |   77 -
 .../impl/mr/collect/DoCollectionImplTest.java      |  118 -
 .../crunch/impl/mr/collect/DoTableImplTest.java    |   88 -
 .../impl/mr/collect/UnionCollectionTest.java       |  161 -
 .../crunch/impl/mr/plan/JobNameBuilderTest.java    |   41 -
 .../crunch/io/CompositePathIterableTest.java       |   81 -
 .../apache/crunch/io/SourceTargetHelperTest.java   |   59 -
 .../crunch/io/avro/AvroFileReaderFactoryTest.java  |  158 -
 .../crunch/io/avro/AvroFileSourceTargetTest.java   |  153 -
 .../apache/crunch/io/avro/AvroFileSourceTest.java  |   82 -
 .../org/apache/crunch/io/avro/AvroReflectTest.java |  115 -
 .../java/org/apache/crunch/lib/AggregateTest.java  |  232 -
 .../lib/AvroIndexedRecordPartitionerTest.java      |   99 -
 .../org/apache/crunch/lib/AvroTypeSortTest.java    |  148 -
 .../java/org/apache/crunch/lib/CartesianTest.java  |   65 -
 .../java/org/apache/crunch/lib/CogroupTest.java    |  126 -
 .../java/org/apache/crunch/lib/SampleTest.java     |   37 -
 src/test/java/org/apache/crunch/lib/SetTest.java   |  115 -
 src/test/java/org/apache/crunch/lib/SortTest.java  |  282 -
 .../apache/crunch/lib/SpecificAvroGroupByTest.java |  139 -
 .../crunch/lib/TupleWritablePartitionerTest.java   |   70 -
 .../apache/crunch/lib/join/FullOuterJoinTest.java  |   51 -
 .../org/apache/crunch/lib/join/InnerJoinTest.java  |   51 -
 .../org/apache/crunch/lib/join/JoinTester.java     |  107 -
 .../apache/crunch/lib/join/LeftOuterJoinTest.java  |   51 -
 .../apache/crunch/lib/join/MapsideJoinTest.java    |  119 -
 .../crunch/lib/join/MultiAvroSchemaJoinTest.java   |  116 -
 .../apache/crunch/lib/join/RightOuterJoinTest.java |   51 -
 .../java/org/apache/crunch/test/CountersTest.java  |   66 -
 src/test/java/org/apache/crunch/test/Employee.java |  230 -
 src/test/java/org/apache/crunch/test/Person.java   |  230 -
 .../org/apache/crunch/types/PTypeUtilsTest.java    |   91 -
 .../crunch/types/avro/AvroDeepCopierTest.java      |   75 -
 .../types/avro/AvroGroupedTableTypeTest.java       |   58 -
 .../crunch/types/avro/AvroTableTypeTest.java       |   52 -
 .../org/apache/crunch/types/avro/AvroTypeTest.java |  135 -
 .../org/apache/crunch/types/avro/AvrosTest.java    |  224 -
 .../writable/WritableGroupedTableTypeTest.java     |   54 -
 .../types/writable/WritableTableTypeTest.java      |   47 -
 .../crunch/types/writable/WritableTypeTest.java    |   46 -
 .../crunch/types/writable/WritablesTest.java       |  279 -
 .../java/org/apache/crunch/util/DistCacheTest.java |  144 -
 src/test/resources/customers.txt                   |    4 -
 src/test/resources/docs.txt                        |    6 -
 src/test/resources/employee.avro                   |    9 -
 src/test/resources/letters.txt                     |    2 -
 src/test/resources/log4j.properties                |   12 -
 src/test/resources/maugham.txt                     |29112 ---------------
 src/test/resources/orders.txt                      |    4 -
 src/test/resources/person.avro                     |    9 -
 src/test/resources/set1.txt                        |    4 -
 src/test/resources/set2.txt                        |    3 -
 src/test/resources/shakes.txt                      | 3667 --
 src/test/resources/urls.txt                        |   11 -
 496 files changed, 60201 insertions(+), 59822 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch/pom.xml b/crunch/pom.xml
new file mode 100644
index 0000000..f64e926
--- /dev/null
+++ b/crunch/pom.xml
@@ -0,0 +1,363 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch</artifactId>
+  <name>Apache Incubator Crunch</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <avro.version>1.7.0</avro.version>
+    <hadoop.version>0.20.2-cdh3u4</hadoop.version>
+    <hbase.version>0.90.6-cdh3u4</hbase.version>
+  </properties>
+
+  <scm>
+    <url>https://git-wip-us.apache.org/repos/asf?p=incubator-crunch.git</url>
+    <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-crunch.git</connection>
+    <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-crunch.git</developerConnection>
+  </scm>
+
+  <issueManagement>
+    <system>JIRA</system>
+    <url>http://issues.apache.org/jira/browse/CRUNCH</url>
+  </issueManagement>
+
+  <licenses>
+    <license>
+      <name>Apache 2</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+  <organization>
+    <name>The Apache Software Foundation</name>
+    <url>http://www.apache.org</url>
+  </organization>
+
+  <mailingLists>
+    <mailingList>
+      <name>User List</name>
+      <subscribe>crunch-user-subscribe@incubator.apache.org</subscribe>
+      <unsubscribe>crunch-user-unsubscribe@incubator.apache.org</unsubscribe>
+      <post>crunch-user@incubator.apache.org</post>
+      <archive>http://mail-archives.apache.org/mod_mbox/incubator-crunch-user/</archive>
+    </mailingList>
+    <mailingList>
+      <name>Developer List</name>
+      <subscribe>crunch-dev-subscribe@incubator.apache.org</subscribe>
+      <unsubscribe>crunch-dev-unsubscribe@incubator.apache.org</unsubscribe>
+      <post>crunch-dev@incubator.apache.org</post>
+      <archive>http://mail-archives.apache.org/mod_mbox/incubator-crunch-dev/</archive>
+    </mailingList>
+    <mailingList>
+      <name>Commits List</name>
+      <subscribe>crunch-commits-subscribe@incubator.apache.org</subscribe>
+      <unsubscribe>crunch-commits-unsubscribe@incubator.apache.org</unsubscribe>
+      <post>crunch-commits@incubator.apache.org</post>
+      <archive>http://mail-archives.apache.org/mod_mbox/incubator-crunch-commits/</archive>
+    </mailingList>
+  </mailingLists>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>11.0.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+      <version>1.8.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.8.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-smile</artifactId>
+      <version>1.8.6</version>
+    </dependency>
+
+    <!-- Both Protobufs and Thrift are supported as
+         derived serialization types, and you can use
+         (almost) any version of them you like, Crunch
+         only relies on the stable public APIs, not the
+         structure of the files themselves.
+
+         Both dependencies are scoped as provided, in
+         order to not expand the size of the assembly jars
+         unnecessarily.
+    -->
+
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.4.1</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <version>0.8.0</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.8.2</version>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.9.0</version>
+      <scope>test</scope>
+    </dependency>
+           
+    <dependency>
+     <groupId>org.apache.hadoop</groupId>
+     <artifactId>hadoop-minicluster</artifactId>
+     <version>${hadoop.version}</version>
+     <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.1</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+   
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.15</version>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.mail</groupId>
+          <artifactId>mail</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.6.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase</artifactId>
+      <version>${hbase.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>maven-hadoop</id>
+      <name>Hadoop Releases</name>
+      <url>https://repository.cloudera.com/content/repositories/releases/</url>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.3.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.10</version>
+        <configuration>
+          <argLine>-Xmx512m</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.1.2</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>jar-no-fork</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.github</groupId>
+        <artifactId>site-maven-plugin</artifactId>
+        <version>0.5</version>
+        <configuration>
+          <message>Building site for ${project.version}</message>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>site</goal>
+            </goals>
+            <phase>site</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>cobertura-maven-plugin</artifactId>
+        <version>2.5.1</version>
+      </plugin>
+    </plugins>
+  </reporting>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/CombineFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java
new file mode 100644
index 0000000..e552286
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java
@@ -0,0 +1,807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.crunch.util.Tuples;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * A special {@link DoFn} implementation that converts an {@link Iterable}
+ * of values into a single value. If a {@code CombineFn} instance is used
+ * on a {@link PGroupedTable}, the function will be applied to the output
+ * of the map stage before the data is passed to the reducer, which can
+ * improve the runtime of certain classes of jobs.
+ *
+ */
+public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, T>> {
+  
+  public static interface Aggregator<T> extends Serializable {
+    /**
+     * Clears the internal state of this Aggregator and prepares it for the values associated
+     * with the next key.
+     */
+    void reset();
+
+    /**
+     * Incorporate the given value into the aggregate state maintained by this instance.
+     */
+    void update(T value);
+    
+    /**
+     * Returns the current aggregated state of this instance.
+     */
+    Iterable<T> results();    
+  }
+  
+  /**
+   * Interface for constructing new aggregator instances.
+   */
+  public static interface AggregatorFactory<T> {
+    Aggregator<T> create();
+  }
+  
+  /**
+   * A {@code CombineFn} that delegates all of the actual work to an {@code Aggregator}
+   * instance.
+   */
+  public static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
+    
+    private final Aggregator<V> aggregator;
+    
+    public AggregatorCombineFn(Aggregator<V> aggregator) {
+      this.aggregator = aggregator;
+    }
+    
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
+      aggregator.reset();
+      for (V v : input.second()) {
+        aggregator.update(v);
+      }
+      for (V v : aggregator.results()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }    
+  }
+  
+  private static abstract class TupleAggregator<T> implements Aggregator<T> {   
+    private final List<Aggregator<Object>> aggregators;
+    
+    public TupleAggregator(Aggregator<?>...aggregators) {
+      this.aggregators = Lists.newArrayList();
+      for (Aggregator<?> a : aggregators) {
+        this.aggregators.add((Aggregator<Object>) a);
+      }
+    }
+    
+    @Override
+    public void reset() {
+      for (Aggregator<?> a : aggregators) {
+        a.reset();
+      }
+    }
+    
+    protected void updateTuple(Tuple t) {
+      for (int i = 0; i < aggregators.size(); i++) {
+        aggregators.get(i).update(t.get(i));
+      }
+    }
+    
+    protected Iterable<Object> results(int index) {
+      return aggregators.get(index).results();
+    }
+  }
+  
+  public static class PairAggregator<V1, V2> extends TupleAggregator<Pair<V1, V2>> {
+    
+    public PairAggregator(Aggregator<V1> a1, Aggregator<V2> a2) {
+      super(a1, a2);
+    }
+    
+    @Override
+    public void update(Pair<V1, V2> value) {
+      updateTuple(value);
+    }
+    
+    @Override
+    public Iterable<Pair<V1, V2>> results() {
+      return new Tuples.PairIterable<V1, V2>((Iterable<V1>) results(0), (Iterable<V2>) results(1));
+    }
+  }
+  
+  public static class TripAggregator<A, B, C> extends TupleAggregator<Tuple3<A, B, C>> {
+    
+    public TripAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3) {
+      super(a1, a2, a3);
+    }
+    
+    @Override
+    public void update(Tuple3<A, B, C> value) {
+      updateTuple(value);
+    }
+    
+    @Override
+    public Iterable<Tuple3<A, B, C>> results() {
+      return new Tuples.TripIterable<A, B, C>((Iterable<A>) results(0),
+          (Iterable<B>) results(1), (Iterable<C>) results(2));
+    }
+  }
+
+  public static class QuadAggregator<A, B, C, D> extends TupleAggregator<Tuple4<A, B, C, D>> {
+    
+    public QuadAggregator(Aggregator<A> a1, Aggregator<B> a2, Aggregator<C> a3, Aggregator<D> a4) {
+      super(a1, a2, a3, a4);
+    }
+    
+    @Override
+    public void update(Tuple4<A, B, C, D> value) {
+      updateTuple(value);
+    }
+    
+    @Override
+    public Iterable<Tuple4<A, B, C, D>> results() {
+      return new Tuples.QuadIterable<A, B, C, D>((Iterable<A>) results(0),
+          (Iterable<B>) results(1), (Iterable<C>) results(2), (Iterable<D>) results(3));
+    }
+  }
+  
+  public static class TupleNAggregator extends TupleAggregator<TupleN> {
+    
+    private final int size;
+    
+    public TupleNAggregator(Aggregator<?>... aggregators) {
+      super(aggregators);
+      size = aggregators.length;
+    }
+    
+    @Override
+    public void update(TupleN value) {
+      updateTuple(value);
+    }
+
+    @Override
+    public Iterable<TupleN> results() {
+      Iterable<?>[] iterables = new Iterable[size];
+      for (int i = 0; i < size; i++) {
+        iterables[i] = results(i);
+      }
+      return new Tuples.TupleNIterable(iterables);
+    }
+    
+  }
+  
+  public static final <K, V> CombineFn<K, V> aggregator(Aggregator<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator);
+  }
+  
+  public static final <K, V> CombineFn<K, V> aggregatorFactory(AggregatorFactory<V> aggregator) {
+    return new AggregatorCombineFn<K, V>(aggregator.create());
+  }
+  
+  public static final <K, V1, V2> CombineFn<K, Pair<V1, V2>> pairAggregator(
+      AggregatorFactory<V1> a1, AggregatorFactory<V2> a2) {
+    return aggregator(new PairAggregator<V1, V2>(a1.create(), a2.create()));
+  }
+  
+  public static final <K, A, B, C> CombineFn<K, Tuple3<A, B, C>> tripAggregator(
+      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3) {
+    return aggregator(new TripAggregator<A, B, C>(a1.create(), a2.create(), a3.create()));
+  }
+
+  public static final <K, A, B, C, D> CombineFn<K, Tuple4<A, B, C, D>> quadAggregator(
+      AggregatorFactory<A> a1, AggregatorFactory<B> a2, AggregatorFactory<C> a3,
+      AggregatorFactory<D> a4) {
+    return aggregator(new QuadAggregator<A, B, C, D>(a1.create(), a2.create(), a3.create(),
+        a4.create()));
+  }
+
+  public static final <K> CombineFn<K, TupleN> tupleAggregator(AggregatorFactory<?>... factories) {
+    Aggregator<?>[] aggs = new Aggregator[factories.length];
+    for (int i = 0; i < aggs.length; i++) {
+      aggs[i] = factories[i].create();
+    }
+    return aggregator(new TupleNAggregator(aggs));
+  }
+  
+  public static final <K> CombineFn<K, Long> SUM_LONGS() {
+    return aggregatorFactory(SUM_LONGS);
+  }
+
+  public static final <K> CombineFn<K, Integer> SUM_INTS() {
+    return aggregatorFactory(SUM_INTS);
+  }
+
+  public static final <K> CombineFn<K, Float> SUM_FLOATS() {
+    return aggregatorFactory(SUM_FLOATS);
+  }
+
+  public static final <K> CombineFn<K, Double> SUM_DOUBLES() {
+    return aggregatorFactory(SUM_DOUBLES);
+  }
+ 
+  public static final <K> CombineFn<K, BigInteger> SUM_BIGINTS() {
+    return aggregatorFactory(SUM_BIGINTS);
+  }
+  
+  public static final <K> CombineFn<K, Long> MAX_LONGS() {
+    return aggregatorFactory(MAX_LONGS);
+  }
+
+  public static final <K> CombineFn<K, Long> MAX_LONGS(int n) {
+    return aggregator(new MaxNAggregator<Long>(n));
+  }
+  
+  public static final <K> CombineFn<K, Integer> MAX_INTS() {
+    return aggregatorFactory(MAX_INTS);
+  }
+
+  public static final <K> CombineFn<K, Integer> MAX_INTS(int n) {
+    return aggregator(new MaxNAggregator<Integer>(n));
+  }
+
+  public static final <K> CombineFn<K, Float> MAX_FLOATS() {
+    return aggregatorFactory(MAX_FLOATS);
+  }
+
+  public static final <K> CombineFn<K, Float> MAX_FLOATS(int n) {
+    return aggregator(new MaxNAggregator<Float>(n));
+  }
+
+  public static final <K> CombineFn<K, Double> MAX_DOUBLES() {
+    return aggregatorFactory(MAX_DOUBLES);
+  }
+
+  public static final <K> CombineFn<K, Double> MAX_DOUBLES(int n) {
+    return aggregator(new MaxNAggregator<Double>(n));
+  }
+  
+  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS() {
+    return aggregatorFactory(MAX_BIGINTS);
+  }
+  
+  public static final <K> CombineFn<K, BigInteger> MAX_BIGINTS(int n) {
+    return aggregator(new MaxNAggregator<BigInteger>(n));
+  }
+  
+  public static final <K> CombineFn<K, Long> MIN_LONGS() {
+    return aggregatorFactory(MIN_LONGS);
+  }
+
+  public static final <K> CombineFn<K, Long> MIN_LONGS(int n) {
+    return aggregator(new MinNAggregator<Long>(n));
+  }
+
+  public static final <K> CombineFn<K, Integer> MIN_INTS() {
+    return aggregatorFactory(MIN_INTS);
+  }
+
+  public static final <K> CombineFn<K, Integer> MIN_INTS(int n) {
+    return aggregator(new MinNAggregator<Integer>(n));
+  }
+  
+  public static final <K> CombineFn<K, Float> MIN_FLOATS() {
+    return aggregatorFactory(MIN_FLOATS);
+  }
+
+  public static final <K> CombineFn<K, Float> MIN_FLOATS(int n) {
+    return aggregator(new MinNAggregator<Float>(n));
+  }
+  
+  public static final <K> CombineFn<K, Double> MIN_DOUBLES() {
+    return aggregatorFactory(MIN_DOUBLES);
+  }
+
+  public static final <K> CombineFn<K, Double> MIN_DOUBLES(int n) {
+    return aggregator(new MinNAggregator<Double>(n));
+  }
+  
+  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS() {
+    return aggregatorFactory(MIN_BIGINTS);
+  }
+  
+  public static final <K> CombineFn<K, BigInteger> MIN_BIGINTS(int n) {
+    return aggregator(new MinNAggregator<BigInteger>(n));
+  }
+  
+  public static final <K, V> CombineFn<K, V> FIRST_N(int n) {
+    return aggregator(new FirstNAggregator<V>(n));
+  }
+
+  public static final <K, V> CombineFn<K, V> LAST_N(int n) {
+    return aggregator(new LastNAggregator<V>(n));
+  }
+  
+  public static class SumLongs implements Aggregator<Long> {    
+    private long sum = 0;
+    
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Long next) {
+      sum += next;
+    }
+    
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+  public static AggregatorFactory<Long> SUM_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() { return new SumLongs(); }
+  };
+  
+  public static class SumInts implements Aggregator<Integer> {
+    private int sum = 0;
+    
+    @Override
+    public void reset() {
+      sum = 0;
+    }
+
+    @Override
+    public void update(Integer next) {
+      sum += next;
+    }
+    
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+  public static AggregatorFactory<Integer> SUM_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() { return new SumInts(); }
+  };
+  
+  public static class SumFloats implements Aggregator<Float> {    
+    private float sum = 0;
+    
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Float next) {
+      sum += next;
+    }
+    
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+  public static AggregatorFactory<Float> SUM_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() { return new SumFloats(); }
+  };
+  
+  public static class SumDoubles implements Aggregator<Double> {    
+    private double sum = 0;
+    
+    @Override
+    public void reset() {
+      sum = 0f;
+    }
+
+    @Override
+    public void update(Double next) {
+      sum += next;
+    }
+    
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+  public static AggregatorFactory<Double> SUM_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() { return new SumDoubles(); }
+  };
+  
+  public static class SumBigInts implements Aggregator<BigInteger> {    
+    private BigInteger sum = BigInteger.ZERO;
+    
+    @Override
+    public void reset() {
+      sum = BigInteger.ZERO;
+    }
+
+    @Override
+    public void update(BigInteger next) {
+      sum = sum.add(next);
+    }
+    
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(sum);
+    }
+  }
+  public static AggregatorFactory<BigInteger> SUM_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() { return new SumBigInts(); }
+  };
+  
+  public static class MaxLongs implements Aggregator<Long> {
+    private Long max = null;
+    
+    @Override
+    public void reset() {
+      max = null;
+    }
+    
+    @Override
+    public void update(Long next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(max);
+    }
+  }
+  public static AggregatorFactory<Long> MAX_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() { return new MaxLongs(); }
+  };
+  
+  public static class MaxInts implements Aggregator<Integer> {
+    private Integer max = null;
+    
+    @Override
+    public void reset() {
+      max = null;
+    }
+    
+    @Override
+    public void update(Integer next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(max);
+    }
+  }
+  public static AggregatorFactory<Integer> MAX_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() { return new MaxInts(); }
+  };
+  
+  public static class MaxFloats implements Aggregator<Float> {
+    private Float max = null;
+    
+    @Override
+    public void reset() {
+      max = null;
+    }
+    
+    @Override
+    public void update(Float next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(max);
+    }
+  }
+  public static AggregatorFactory<Float> MAX_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() { return new MaxFloats(); }
+  };
+  
+  public static class MaxDoubles implements Aggregator<Double> {
+    private Double max = null;
+    
+    @Override
+    public void reset() {
+      max = null;
+    }
+    
+    @Override
+    public void update(Double next) {
+      if (max == null || max < next) {
+        max = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(max);
+    }
+  }
+  public static AggregatorFactory<Double> MAX_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() { return new MaxDoubles(); }
+  };
+  
+  public static class MaxBigInts implements Aggregator<BigInteger> {
+    private BigInteger max = null;
+    
+    @Override
+    public void reset() {
+      max = null;
+    }
+    
+    @Override
+    public void update(BigInteger next) {
+      if (max == null || max.compareTo(next) < 0) {
+        max = next;
+      }
+    }
+    
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(max);
+    }
+  }
+  public static AggregatorFactory<BigInteger> MAX_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() { return new MaxBigInts(); }
+  };
+  
+  public static class MinLongs implements Aggregator<Long> {
+    private Long min = null;
+    
+    @Override
+    public void reset() {
+      min = null;
+    }
+    
+    @Override
+    public void update(Long next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Long> results() {
+      return ImmutableList.of(min);
+    }
+  }
+  public static AggregatorFactory<Long> MIN_LONGS = new AggregatorFactory<Long>() {
+    public Aggregator<Long> create() { return new MinLongs(); }
+  };
+  
+  public static class MinInts implements Aggregator<Integer> {    
+    private Integer min = null;
+    
+    @Override
+    public void reset() {
+      min = null;
+    }
+    
+    @Override
+    public void update(Integer next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Integer> results() {
+      return ImmutableList.of(min);
+    }
+  }
+  public static AggregatorFactory<Integer> MIN_INTS = new AggregatorFactory<Integer>() {
+    public Aggregator<Integer> create() { return new MinInts(); }
+  };
+  
+  public static class MinFloats implements Aggregator<Float> {
+    private Float min = null;
+    
+    @Override
+    public void reset() {
+      min = null;
+    }
+    
+    @Override
+    public void update(Float next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Float> results() {
+      return ImmutableList.of(min);
+    }
+  }
+  public static AggregatorFactory<Float> MIN_FLOATS = new AggregatorFactory<Float>() {
+    public Aggregator<Float> create() { return new MinFloats(); }
+  };
+  
+  public static class MinDoubles implements Aggregator<Double> {
+    private Double min = null;
+    
+    @Override
+    public void reset() {
+      min = null;
+    }
+    
+    @Override
+    public void update(Double next) {
+      if (min == null || min > next) {
+        min = next;
+      }
+    }
+    
+    @Override
+    public Iterable<Double> results() {
+      return ImmutableList.of(min);
+    }
+  }
+  public static AggregatorFactory<Double> MIN_DOUBLES = new AggregatorFactory<Double>() {
+    public Aggregator<Double> create() { return new MinDoubles(); }
+  };
+
+  public static class MinBigInts implements Aggregator<BigInteger> {
+    private BigInteger min = null;
+    
+    @Override
+    public void reset() {
+      min = null;
+    }
+    
+    @Override
+    public void update(BigInteger next) {
+      if (min == null || min.compareTo(next) > 0) {
+        min = next;
+      }
+    }
+    
+    @Override
+    public Iterable<BigInteger> results() {
+      return ImmutableList.of(min);
+    }
+  }
+  public static AggregatorFactory<BigInteger> MIN_BIGINTS = new AggregatorFactory<BigInteger>() {
+    public Aggregator<BigInteger> create() { return new MinBigInts(); }
+  };
+  
+  public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MaxNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+    
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.first()) > 0) {
+        elements.remove(elements.first());
+        elements.add(value);
+      }
+    }
+    
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+  
+  public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+    
+    public MinNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = Sets.newTreeSet();
+      } else {
+        elements.clear();
+      }
+    }
+    
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.last()) < 0) {
+        elements.remove(elements.last());
+        elements.add(value);
+      }
+    }
+    
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+  
+  public static class FirstNAggregator<V> implements Aggregator<V> {
+    private final int arity;
+    private final List<V> elements;
+    
+    public FirstNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newArrayList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+    
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      }
+    }
+    
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  public static class LastNAggregator<V> implements Aggregator<V> {
+    private final int arity;
+    private final LinkedList<V> elements;
+    
+    public LastNAggregator(int arity) {
+      this.arity = arity;
+      this.elements = Lists.newLinkedList();
+    }
+
+    @Override
+    public void reset() {
+      elements.clear();
+    }
+    
+    @Override
+    public void update(V value) {
+      elements.add(value);
+      if (elements.size() == arity + 1) {
+        elements.removeFirst();
+      }
+    }
+    
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
new file mode 100644
index 0000000..30fa70c
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/DoFn.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import org.apache.crunch.test.TestCounters;
+
+/**
+ * Base class for all data processing functions in Crunch.
+ * 
+ * <p>Note that all {@code DoFn} instances implement {@link Serializable},
+ * and thus all of their non-transient member variables must implement
+ * {@code Serializable} as well. If your DoFn depends on non-serializable
+ * classes for data processing, they may be declared as {@code transient}
+ * and initialized in the DoFn's {@code initialize} method.
+ *
+ */
+public abstract class DoFn<S, T> implements Serializable {
+  private transient TaskInputOutputContext<?, ?, ?, ?> context;
+  private transient Configuration testConf;
+  private transient String internalStatus;
+  
+  /**
+   * Called during the job planning phase. Subclasses may override
+   * this method in order to modify the configuration of the Job
+   * that this DoFn instance belongs to.
+   * 
+   * @param conf The Configuration instance for the Job.
+   */
+  public void configure(Configuration conf) {  
+  }
+  
+  /**
+   * Processes the records from a {@link PCollection}.
+   * 
+   * <br/>
+   * <br/>
+   * <b>Note:</b> Crunch can reuse a single input record object whose content
+   * changes on each {@link #process(Object, Emitter)} method call. This
+   * functionality is imposed by Hadoop's <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reducer.html">Reducer</a> implementation: 
+   * <i>The framework will reuse the key and value objects that are passed into the reduce, therefore the application
+   * should clone the objects they want to keep a copy of.</i>
+   * 
+   * @param input
+   *          The input record.
+   * @param emitter
+   *          The emitter to send the output to
+   */
+  public abstract void process(S input, Emitter<T> emitter);
+
+  /**
+   * Called during the setup of the MapReduce job this {@code DoFn}
+   * is associated with. Subclasses may override this method to
+   * do appropriate initialization.
+   */
+  public void initialize() {
+  }
+
+  /**
+   * Called during the cleanup of the MapReduce job this {@code DoFn}
+   * is associated with. Subclasses may override this method to do
+   * appropriate cleanup.
+   * 
+   * @param emitter The emitter that was used for output
+   */
+  public void cleanup(Emitter<T> emitter) {
+  }
+
+  /**
+   * Called during setup to pass the {@link TaskInputOutputContext} to
+   * this {@code DoFn} instance.
+   */
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    this.context = context;
+    initialize();
+  }
+
+  /**
+   * Sets a {@code Configuration} instance to be used during unit tests.
+   * @param conf The Configuration instance.
+   */
+  public void setConfigurationForTest(Configuration conf) {
+    this.testConf = conf;
+  }
+  
+  /**
+   * Returns an estimate of how applying this function to a {@link PCollection}
+   * will cause it to change in side. The optimizer uses these estimates to
+   * decide where to break up dependent MR jobs into separate Map and Reduce
+   * phases in order to minimize I/O.
+   * 
+   * <p>
+   * Subclasses of {@code DoFn} that will substantially alter the size of the
+   * resulting {@code PCollection} should override this method.
+   */
+  public float scaleFactor() {
+    return 1.2f;
+  }
+  
+  protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
+    return context;
+  }
+  
+  protected Configuration getConfiguration() {
+    if (context != null) {
+      return context.getConfiguration();
+    } else if (testConf != null) {
+      return testConf;
+    }
+    return null;
+  }
+  
+  protected Counter getCounter(Enum<?> counterName) {
+    if (context == null) {
+      return TestCounters.getCounter(counterName);
+    }
+    return context.getCounter(counterName);
+  }
+  
+  protected Counter getCounter(String groupName, String counterName) {
+    if (context == null) {
+      return TestCounters.getCounter(groupName, counterName);
+    }
+    return context.getCounter(groupName, counterName);
+  }
+  
+  protected void increment(Enum<?> counterName) {
+    increment(counterName, 1);
+  }
+  
+  protected void increment(Enum<?> counterName, long value) {
+    getCounter(counterName).increment(value);
+  }
+  
+  protected void progress() {
+    if (context != null) {
+      context.progress();
+    }
+  }
+  
+  protected TaskAttemptID getTaskAttemptID() {
+    if (context != null) {
+      return context.getTaskAttemptID();
+    } else {
+      return new TaskAttemptID();
+    }
+  }
+  
+  protected void setStatus(String status) {
+    if (context != null) {
+      context.setStatus(status);
+    }
+    this.internalStatus = status;
+  }
+  
+  protected String getStatus() {
+    if (context != null) {
+      return context.getStatus();
+    }
+    return internalStatus;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/Emitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Emitter.java b/crunch/src/main/java/org/apache/crunch/Emitter.java
new file mode 100644
index 0000000..b9d4ff2
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/Emitter.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * Interface for writing outputs from a {@link DoFn}.
+ *
+ */
+public interface Emitter<T> {
+  /**
+   * Write the emitted value to the next stage of the pipeline.
+   * 
+   * @param emitted The value to write
+   */
+  void emit(T emitted);
+
+  /**
+   * Flushes any values cached by this emitter. Called during the
+   * cleanup stage.
+   */
+  void flush();
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
new file mode 100644
index 0000000..d471710
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link DoFn} for the common case of filtering the members of
+ * a {@link PCollection} based on a boolean condition.
+ *
+ */
+public abstract class FilterFn<T> extends DoFn<T, T> {
+
+  /**
+   * If true, emit the given record.
+   */
+  public abstract boolean accept(T input);
+
+  @Override
+  public void process(T input, Emitter<T> emitter) {
+    if (accept(input)) {
+      emitter.emit(input);
+    }
+  }
+
+  @Override
+  public float scaleFactor() {
+    return 0.5f;
+  }
+  
+  public static <S> FilterFn<S> and(FilterFn<S>...fns) {
+    return new AndFn<S>(fns);
+  }
+  
+  public static class AndFn<S> extends FilterFn<S> {
+    
+    private final List<FilterFn<S>> fns;
+    
+    public AndFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+    }
+    
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (!fn.accept(input)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 1.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor *= fn.scaleFactor();
+      }
+      return scaleFactor;
+    }    
+  }
+  
+  public static <S> FilterFn<S> or(FilterFn<S>...fns) {
+    return new OrFn<S>(fns);
+  }
+  
+  public static class OrFn<S> extends FilterFn<S> {
+    
+    private final List<FilterFn<S>> fns;
+    
+    public OrFn(FilterFn<S>... fns) {
+      this.fns = ImmutableList.<FilterFn<S>>copyOf(fns);
+    }
+    
+    @Override
+    public boolean accept(S input) {
+      for (FilterFn<S> fn : fns) {
+        if (fn.accept(input)) {
+          return true;
+        }
+      }
+      return false;
+    }
+    
+    @Override
+    public float scaleFactor() {
+      float scaleFactor = 0.0f;
+      for (FilterFn<S> fn : fns) {
+        scaleFactor += fn.scaleFactor();
+      }
+      return Math.min(1.0f, scaleFactor);
+    }    
+  }
+  
+  public static <S> FilterFn<S> not(FilterFn<S> fn) {
+    return new NotFn<S>(fn);
+  }
+  
+  public static class NotFn<S> extends FilterFn<S> {
+    
+    private final FilterFn<S> base;
+    
+    public NotFn(FilterFn<S> base) {
+      this.base = base;
+    }
+    
+    @Override
+    public boolean accept(S input) {
+      return !base.accept(input);
+    }
+    
+    @Override
+    public float scaleFactor() {
+      return 1.0f - base.scaleFactor();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
new file mode 100644
index 0000000..d6cd12e
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Options that can be passed to a {@code groupByKey} operation in order to exercise
+ * finer control over how the partitioning, grouping, and sorting of keys is
+ * performed.
+ *
+ */
+public class GroupingOptions {
+
+  private final Class<? extends Partitioner> partitionerClass;
+  private final Class<? extends RawComparator> groupingComparatorClass;
+  private final Class<? extends RawComparator> sortComparatorClass;
+  private final int numReducers;
+
+  private GroupingOptions(Class<? extends Partitioner> partitionerClass,
+      Class<? extends RawComparator> groupingComparatorClass,
+      Class<? extends RawComparator> sortComparatorClass, int numReducers) {
+    this.partitionerClass = partitionerClass;
+    this.groupingComparatorClass = groupingComparatorClass;
+    this.sortComparatorClass = sortComparatorClass;
+    this.numReducers = numReducers;
+  }
+
+  public int getNumReducers() {
+    return numReducers;
+  }
+  
+  public Class<? extends RawComparator> getSortComparatorClass() {
+    return sortComparatorClass;
+  }
+  
+  public void configure(Job job) {
+    if (partitionerClass != null) {
+      job.setPartitionerClass(partitionerClass);
+    }
+    if (groupingComparatorClass != null) {
+      job.setGroupingComparatorClass(groupingComparatorClass);
+    }
+    if (sortComparatorClass != null) {
+      job.setSortComparatorClass(sortComparatorClass);
+    }
+    if (numReducers > 0) {
+      job.setNumReduceTasks(numReducers);
+    }
+  }
+
+  public boolean isCompatibleWith(GroupingOptions other) {
+    if (partitionerClass != other.partitionerClass) {
+      return false;
+    }
+    if (groupingComparatorClass != other.groupingComparatorClass) {
+      return false;
+    }
+    if (sortComparatorClass != other.sortComparatorClass) {
+      return false;
+    }
+    return true;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder class for creating {@code GroupingOptions} instances.
+   *
+   */
+  public static class Builder {
+    private Class<? extends Partitioner> partitionerClass;
+    private Class<? extends RawComparator> groupingComparatorClass;
+    private Class<? extends RawComparator> sortComparatorClass;
+    private int numReducers;
+
+    public Builder() {
+    }
+
+    public Builder partitionerClass(
+        Class<? extends Partitioner> partitionerClass) {
+      this.partitionerClass = partitionerClass;
+      return this;
+    }
+
+    public Builder groupingComparatorClass(
+        Class<? extends RawComparator> groupingComparatorClass) {
+      this.groupingComparatorClass = groupingComparatorClass;
+      return this;
+    }
+
+    public Builder sortComparatorClass(
+        Class<? extends RawComparator> sortComparatorClass) {
+      this.sortComparatorClass = sortComparatorClass;
+      return this;
+    }
+
+    public Builder numReducers(int numReducers) {
+      if (numReducers <= 0) {
+        throw new IllegalArgumentException("Invalid number of reducers: " + numReducers);
+      }
+      this.numReducers = numReducers;
+      return this;
+    }
+
+    public GroupingOptions build() {
+      return new GroupingOptions(partitionerClass, groupingComparatorClass,
+          sortComparatorClass, numReducers);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/crunch/src/main/java/org/apache/crunch/MapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/MapFn.java b/crunch/src/main/java/org/apache/crunch/MapFn.java
new file mode 100644
index 0000000..b1c5520
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/MapFn.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+/**
+ * A {@link DoFn} for the common case of emitting exactly one value
+ * for each input record.
+ *
+ */
+public abstract class MapFn<S, T> extends DoFn<S, T> {
+  
+  /**
+   * Maps the given input into an instance of the output type.
+   */
+  public abstract T map(S input);
+
+  @Override
+  public void process(S input, Emitter<T> emitter) {
+    emitter.emit(map(input));
+  }
+
+  @Override
+  public float scaleFactor() {
+    return 1.0f;
+  }
+}


Mime
View raw message