crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-231: Support legacy Mappers and Reducers in Crunch.
Date Mon, 22 Jul 2013 22:36:30 GMT
Updated Branches:
  refs/heads/master 36bde4162 -> 3eb5f0a8a


CRUNCH-231: Support legacy Mappers and Reducers in Crunch.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3eb5f0a8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3eb5f0a8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3eb5f0a8

Branch: refs/heads/master
Commit: 3eb5f0a8a7b0918e79232a301ad3db8af0756e9b
Parents: 36bde41
Author: Josh Wills <jwills@apache.org>
Authored: Fri Jun 28 20:30:56 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Jul 22 15:22:12 2013 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/lib/MapredIT.java | 134 ++++++++
 .../java/org/apache/crunch/lib/MapreduceIT.java | 120 +++++++
 .../main/java/org/apache/crunch/lib/Mapred.java | 301 +++++++++++++++++
 .../java/org/apache/crunch/lib/Mapreduce.java   | 335 +++++++++++++++++++
 .../java/org/apache/crunch/lib/MapredTest.java  | 134 ++++++++
 .../org/apache/crunch/lib/MapreduceTest.java    | 117 +++++++
 6 files changed, 1141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java
new file mode 100644
index 0000000..7c09790
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/MapredIT.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.PipelineResult.StageResult;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+
+public class MapredIT extends CrunchTestSupport implements Serializable {
+  private static class TestMapper implements Mapper<IntWritable, Text, Text, LongWritable>
{
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void map(IntWritable k, Text v, OutputCollector<Text, LongWritable> out,
+        Reporter reporter) throws IOException {
+      reporter.getCounter("written", "out").increment(1L);
+      out.collect(v, new LongWritable(v.getLength()));
+    }
+  }
+  
+  private static class TestReducer implements Reducer<IntWritable, Text, Text, LongWritable>
{
+
+    @Override
+    public void configure(JobConf arg0) { 
+    }
+
+    @Override
+    public void close() throws IOException { 
+    }
+
+    @Override
+    public void reduce(IntWritable key, Iterator<Text> iter,
+        OutputCollector<Text, LongWritable> out, Reporter reporter) throws IOException
{
+      boolean hasThou = false;
+      String notThou = "";
+      while (iter.hasNext()) {
+        String next = iter.next().toString();
+        if (next != null && next.contains("thou")) {
+          reporter.getCounter("thou", "count").increment(1);
+          hasThou = true;
+        } else {
+          notThou = next;
+        }
+      }
+      out.collect(new Text(notThou), hasThou ? new LongWritable(1L) : new LongWritable(0L));
+    }
+  }
+  
+  @Test
+  public void testMapper() throws Exception {
+    Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration());
+    Path shakesPath = tempDir.copyResourcePath("shakes.txt");
+    PCollection<String> in = p.read(From.textFile(shakesPath));
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable,
Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(String input) {
+        return Pair.of(new IntWritable(input.length()), new Text(input));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<Text, LongWritable> out = Mapred.map(two, TestMapper.class, Text.class,
LongWritable.class);
+    out.write(To.sequenceFile(tempDir.getPath("temp")));
+    PipelineResult res = p.done();
+    assertEquals(1, res.getStageResults().size());
+    StageResult sr = res.getStageResults().get(0);
+    assertEquals(3667, sr.getCounters().findCounter("written", "out").getValue());
+  }
+  
+  @Test
+  public void testReducer() throws Exception {
+    Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration());
+    Path shakesPath = tempDir.copyResourcePath("shakes.txt");
+    PCollection<String> in = p.read(From.textFile(shakesPath));
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable,
Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(String input) {
+        return Pair.of(new IntWritable(input.length()), new Text(input));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<Text, LongWritable> out = Mapred.reduce(two.groupByKey(), TestReducer.class,
Text.class, LongWritable.class);
+    out.write(To.sequenceFile(tempDir.getPath("temp")));
+    PipelineResult res = p.done();
+    assertEquals(1, res.getStageResults().size());
+    StageResult sr = res.getStageResults().get(0);
+    assertEquals(108, sr.getCounters().findCounter("thou", "count").getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java
new file mode 100644
index 0000000..ab453e0
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/MapreduceIT.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineResult;
+import org.apache.crunch.PipelineResult.StageResult;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MapreduceIT extends CrunchTestSupport implements Serializable {
+  private static class TestMapper extends Mapper<IntWritable, Text, IntWritable, Text>
{
+    @Override
+    protected void map(IntWritable k, Text v, Mapper<IntWritable, Text, IntWritable, Text>.Context
ctxt) {
+      try {
+        ctxt.getCounter("written", "out").increment(1L);
+        ctxt.write(new IntWritable(v.getLength()), v);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  private static class TestReducer extends Reducer<IntWritable, Text, Text, LongWritable>
{
+    protected void reduce(IntWritable key, Iterable<Text> values,
+        org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, Text, LongWritable>.Context
ctxt) {
+      boolean hasWhere = false;
+      String notWhere = "";
+      for (Text t : values) {
+        String next = t.toString();
+        if (next.contains("where")) {
+          hasWhere = true;
+          ctxt.getCounter("words", "where").increment(1);
+        } else {
+          notWhere = next;
+        }
+      }
+      try {
+        ctxt.write(new Text(notWhere), hasWhere ? new LongWritable(1L) : new LongWritable(0L));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Test
+  public void testMapper() throws Exception {
+    Pipeline p = new MRPipeline(MapreduceIT.class, tempDir.getDefaultConfiguration());
+    Path shakesPath = tempDir.copyResourcePath("shakes.txt");
+    PCollection<String> in = p.read(From.textFile(shakesPath));
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable,
Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(String input) {
+        return Pair.of(new IntWritable(input.length()), new Text(input));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<IntWritable, Text> out = Mapreduce.map(two, TestMapper.class, IntWritable.class,
Text.class);
+    out.write(To.sequenceFile(tempDir.getPath("temp")));
+    PipelineResult res = p.done();
+    assertEquals(1, res.getStageResults().size());
+    StageResult sr = res.getStageResults().get(0);
+    assertEquals(3667, sr.getCounters().findCounter("written", "out").getValue());
+  }
+  
+  @Test
+  public void testReducer() throws Exception {
+    Pipeline p = new MRPipeline(MapredIT.class, tempDir.getDefaultConfiguration());
+    Path shakesPath = tempDir.copyResourcePath("shakes.txt");
+    PCollection<String> in = p.read(From.textFile(shakesPath));
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<String, Pair<IntWritable,
Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(String input) {
+        return Pair.of(new IntWritable(input.length()), new Text(input));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<Text, LongWritable> out = Mapreduce.reduce(two.groupByKey(), TestReducer.class,
Text.class, LongWritable.class);
+    out.write(To.sequenceFile(tempDir.getPath("temp")));
+    PipelineResult res = p.done();
+    assertEquals(1, res.getStageResults().size());
+    StageResult sr = res.getStageResults().get(0);
+    assertEquals(19, sr.getCounters().findCounter("words", "where").getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java b/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java
new file mode 100644
index 0000000..be10ff6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Mapred.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Set;
+
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Static functions for working with legacy Mappers and Reducers that live under the org.apache.hadoop.mapred.*
+ * package as part of Crunch pipelines.
+ */
+public class Mapred {
+
+  public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2>
map(
+      PTable<K1, V1> input,
+      Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
+      Class<K2> keyClass, Class<V2> valueClass) {
+    return input.parallelDo(new MapperFn<K1, V1, K2, V2>(mapperClass), tableOf(keyClass,
valueClass));
+  }
+  
+  public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2>
reduce(
+      PGroupedTable<K1, V1> input,
+      Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
+          Class<K2> keyClass, Class<V2> valueClass) {
+    return input.parallelDo(new ReducerFn<K1, V1, K2, V2>(reducerClass), tableOf(keyClass,
valueClass));
+  }
+  
+  private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf(
+      Class<K> keyClass, Class<V> valueClass) {
+    return Writables.tableOf(Writables.writables(keyClass), Writables.writables(valueClass));
+  }
+  
+  private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable> extends
+      DoFn<Pair<K1, V1>, Pair<K2, V2>> implements Reporter {
+    private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass;
+    private transient Mapper<K1, V1, K2, V2> instance;
+    private transient OutputCollectorImpl<K2, V2> outputCollector;
+    
+    public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> mapperClass) {
+      this.mapperClass = Preconditions.checkNotNull(mapperClass);
+    }
+    
+    @Override
+    public void initialize() {
+      if (instance == null) {
+        this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration());
+      }
+      instance.configure(new JobConf(getConfiguration()));
+      outputCollector = new OutputCollectorImpl<K2, V2>();
+    }
+    
+    @Override
+    public void process(Pair<K1, V1> input, Emitter<Pair<K2, V2>> emitter)
{
+      outputCollector.set(emitter);
+      try {
+        instance.map(input.first(), input.second(), outputCollector, this);
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<K2, V2>> emitter) {
+      try {
+        instance.close();
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error closing mapper = " + mapperClass, e);
+      }
+    }
+
+    @Override
+    public void progress() {
+      super.progress();
+    }
+    
+    @Override
+    public void setStatus(String status) {
+      super.setStatus(status);
+    }
+    
+    public Counters.Counter getCounter(Enum<?> counter) {
+      return proxyCounter(super.getCounter(counter));
+    }
+    
+    public Counters.Counter getCounter(String group, String name) {
+      return proxyCounter(super.getCounter(group, name));
+    }
+    
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      return null;
+    }
+
+    @Override
+    public void incrCounter(Enum<?> counter, long by) {
+      super.increment(counter, by);
+    }
+
+    @Override
+    public void incrCounter(String group, String name, long by) {
+      super.increment(group, name, by);
+    }
+    
+    public float getProgress() {
+      return 0.5f;
+    }
+  }
+  
+
+  private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable>
extends
+      DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>> implements Reporter
{
+    private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass;
+    private transient Reducer<K1, V1, K2, V2> instance;
+    private transient OutputCollectorImpl<K2, V2> outputCollector;
+
+    public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> reducerClass) {
+      this.reducerClass = Preconditions.checkNotNull(reducerClass);
+    }
+
+    @Override
+    public void initialize() {
+      if (instance == null) {
+        this.instance = ReflectionUtils.newInstance(reducerClass, getConfiguration());
+      }
+      instance.configure(new JobConf(getConfiguration()));
+      outputCollector = new OutputCollectorImpl<K2, V2>();
+    }
+
+    @Override
+    public void process(Pair<K1, Iterable<V1>> input, Emitter<Pair<K2,
V2>> emitter) {
+      outputCollector.set(emitter);
+      try {
+        instance.reduce(input.first(), input.second().iterator(), outputCollector, this);
+      } catch (IOException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+
+    @Override
+    public void cleanup(Emitter<Pair<K2, V2>> emitter) {
+      try {
+        instance.close();
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error closing mapper = " + reducerClass, e);
+      }
+    }
+
+    @Override
+    public void progress() {
+      super.progress();
+    }
+
+    @Override
+    public void setStatus(String status) {
+      super.setStatus(status);
+    }
+
+    public Counters.Counter getCounter(Enum<?> counter) {
+      return proxyCounter(super.getCounter(counter));
+    }
+
+    public Counters.Counter getCounter(String group, String name) {
+      return proxyCounter(super.getCounter(group, name));
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      return null;
+    }
+
+    @Override
+    public void incrCounter(Enum<?> counter, long by) {
+      super.increment(counter, by);
+    }
+
+    @Override
+    public void incrCounter(String group, String name, long by) {
+      super.increment(group, name, by);
+    }
+    
+    public float getProgress() {
+      return 0.5f;
+    }
+  }
+
+  private static class OutputCollectorImpl<K, V> implements OutputCollector<K, V>
{
+    private Emitter<Pair<K, V>> emitter;
+    
+    public OutputCollectorImpl() { }
+    
+    public void set(Emitter<Pair<K, V>> emitter) {
+      this.emitter = emitter;
+    }
+    
+    @Override
+    public void collect(K k, V v) throws IOException {
+      emitter.emit(Pair.of(k, v));
+    }
+  }
+  
+  private static Counters.Counter proxyCounter(Counter c) {
+    ProxyFactory proxyFactory = new ProxyFactory();
+    proxyFactory.setSuperclass(Counters.Counter.class);
+    proxyFactory.setFilter(CCMethodHandler.FILTER);
+    CCMethodHandler handler = new CCMethodHandler(c);
+    try {
+      return (Counters.Counter) proxyFactory.create(new Class[0], new Object[0], handler);
+    } catch (Exception e) {
+      throw new CrunchRuntimeException(e);
+    }
+  }
+  
+  private static class CCMethodHandler implements MethodHandler {
+    private static final Set<String> HANDLED = ImmutableSet.of("increment",
+        "getCounter", "getValue", "getName", "getDisplayName", "setValue",
+        "getUnderlyingCounter", "readFields", "write");
+    public static final MethodFilter FILTER = new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        return HANDLED.contains(m.getName());
+      }
+    };
+    
+    private final Counter c;
+    
+    public CCMethodHandler(Counter c) {
+      this.c = c;
+    }
+    
+    @Override
+    public Object invoke(Object obj, Method m, Method m2, Object[] args) throws Throwable
{
+      String name = m.getName();
+      if ("increment".equals(name)) {
+        c.increment((Long) args[0]);
+        return null;
+      } else if ("getCounter".equals(name) || "getValue".equals(name)) {
+        return c.getValue();
+      } else if ("setValue".equals(name)) {
+        c.setValue((Long) args[0]);
+        return null;
+      } else if ("getDisplayName".equals(name)) {
+        return c.getDisplayName();
+      } else if ("getName".equals(name)) {
+        return c.getName();
+      } else if ("getUnderlyingCounter".equals(name)) {
+        return c;
+      } else if ("readFields".equals(name)) {
+        c.readFields((DataInput) args[0]);
+        return null;
+      } else if ("write".equals(name)) {
+        c.write((DataOutput) args[0]);
+        return null;
+      }
+      throw new IllegalStateException("Unhandled Counters.Counter method = " + name);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java b/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java
new file mode 100644
index 0000000..a0a6b3e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Mapreduce.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Static functions for working with legacy Mappers and Reducers that live under the org.apache.hadoop.mapreduce.*
+ * package as part of Crunch pipelines.
+ */
+public class Mapreduce {
+  
+  public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2>
map(
+      PTable<K1, V1> input,
+      Class<? extends Mapper<K1, V1, K2, V2>> mapperClass,
+      Class<K2> keyClass, Class<V2> valueClass) {
+    return input.parallelDo(new MapperFn<K1, V1, K2, V2>(mapperClass), tableOf(keyClass,
valueClass));
+  }
+  
+  public static <K1, V1, K2 extends Writable, V2 extends Writable> PTable<K2, V2>
reduce(
+      PGroupedTable<K1, V1> input,
+      Class<? extends Reducer<K1, V1, K2, V2>> reducerClass,
+          Class<K2> keyClass, Class<V2> valueClass) {
+    return input.parallelDo(new ReducerFn<K1, V1, K2, V2>(reducerClass), tableOf(keyClass,
valueClass));
+  }
+
+  private static <K extends Writable, V extends Writable> PTableType<K, V> tableOf(
+      Class<K> keyClass, Class<V> valueClass) {
+    return Writables.tableOf(Writables.writables(keyClass), Writables.writables(valueClass));
+  }
+  
+  private static class MapperFn<K1, V1, K2 extends Writable, V2 extends Writable> extends
+      DoFn<Pair<K1, V1>, Pair<K2, V2>> {
+    private final Class<? extends Mapper<K1, V1, K2, V2>> mapperClass;
+    private transient Mapper<K1, V1, K2, V2> instance;
+    private transient Mapper.Context context;
+    private transient CtxtMethodHandler handler;
+    private transient Method setupMethod;
+    private transient Method mapMethod;
+    private transient Method cleanupMethod;
+    
+    public MapperFn(Class<? extends Mapper<K1, V1, K2, V2>> mapperClass) {
+      this.mapperClass = Preconditions.checkNotNull(mapperClass);
+    }
+    
+    @Override
+    public void initialize() {
+      if (instance == null) {
+        this.instance = ReflectionUtils.newInstance(mapperClass, getConfiguration());
+        try {
+          for (Method m : mapperClass.getDeclaredMethods()) {
+            if ("setup".equals(m.getName())) {
+              this.setupMethod = m;
+              this.setupMethod.setAccessible(true);
+            } else if ("cleanup".equals(m.getName())) {
+              this.cleanupMethod = m;
+              this.cleanupMethod.setAccessible(true);
+            } else if ("map".equals(m.getName())) {
+              this.mapMethod = m;
+              this.mapMethod.setAccessible(true);
+            }
+          }
+          
+          if (mapMethod == null) {
+            throw new CrunchRuntimeException("No map method for class: " + mapperClass);
+          }
+          
+          ProxyFactory proxyFactory = new ProxyFactory();
+          proxyFactory.setSuperclass(Mapper.Context.class);
+          proxyFactory.setFilter(CtxtMethodHandler.FILTER);
+          Class[] paramTypes = new Class[] { Mapper.class };
+          Object[] args = new Object[] { instance };
+          if (!Modifier.isAbstract(Mapper.Context.class.getModifiers())) {
+            paramTypes = new Class[] { Mapper.class,
+                Configuration.class, TaskAttemptID.class,
+                RecordReader.class, RecordWriter.class,
+                OutputCommitter.class,
+                Class.forName("org.apache.hadoop.mapreduce.StatusReporter"),
+                InputSplit.class
+            };
+            args = new Object[] { instance, getConfiguration(), getTaskAttemptID(),
+                null, null, NO_OP_OUTPUT_COMMITTER, null, null
+            };
+          }
+          this.handler = new CtxtMethodHandler(this.getContext());
+          this.context = (Mapper.Context) proxyFactory.create(paramTypes, args, handler);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+      if (setupMethod != null) {
+        try {
+          setupMethod.invoke(instance, context);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+    
+    @Override
+    public void process(Pair<K1, V1> input, Emitter<Pair<K2, V2>> emitter)
{
+      handler.set(emitter);
+      try {
+        mapMethod.invoke(instance, input.first(), input.second(), context);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+    
+    @Override
+    public void cleanup(Emitter<Pair<K2, V2>> emitter) {
+      if (cleanupMethod != null) {
+        handler.set(emitter);
+        try {
+          cleanupMethod.invoke(instance, context);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+  }
+  
+  private static class ReducerFn<K1, V1, K2 extends Writable, V2 extends Writable>
extends
+      DoFn<Pair<K1, Iterable<V1>>, Pair<K2, V2>> {
+    private final Class<? extends Reducer<K1, V1, K2, V2>> reducerClass;
+    private transient Reducer<K1, V1, K2, V2> instance;
+    private transient CtxtMethodHandler handler;
+    private transient Reducer.Context context;
+    private transient Method setupMethod;
+    private transient Method reduceMethod;
+    private transient Method cleanupMethod;
+
+    public ReducerFn(Class<? extends Reducer<K1, V1, K2, V2>> reducerClass) {
+      this.reducerClass = Preconditions.checkNotNull(reducerClass);
+    }
+
+    @Override
+    public void initialize() {
+      if (instance == null) {
+        this.instance = ReflectionUtils.newInstance(reducerClass, getConfiguration());
+        try {
+          for (Method m : reducerClass.getDeclaredMethods()) {
+            if ("setup".equals(m.getName())) {
+              this.setupMethod = m;
+              this.setupMethod.setAccessible(true);
+            } else if ("cleanup".equals(m.getName())) {
+              this.cleanupMethod = m;
+              this.cleanupMethod.setAccessible(true);
+            } else if ("reduce".equals(m.getName())) {
+              this.reduceMethod = m;
+              this.reduceMethod.setAccessible(true);
+            }
+          }
+          
+          if (reduceMethod == null) {
+            throw new CrunchRuntimeException("No reduce method for class: " + reducerClass);
+          }
+
+          ProxyFactory proxyFactory = new ProxyFactory();
+          proxyFactory.setSuperclass(Reducer.Context.class);
+          proxyFactory.setFilter(CtxtMethodHandler.FILTER);
+          Class[] paramTypes = new Class[] { Reducer.class };
+          Object[] args = new Object[] { instance };
+          if (!Modifier.isAbstract(Reducer.Context.class.getModifiers())) {
+            Class rkvi = Class.forName("org.apache.hadoop.mapred.RawKeyValueIterator"); 
+            Object rawKeyValueIterator = Proxy.newProxyInstance(rkvi.getClassLoader(),
+                new Class[] { rkvi }, new InvocationHandler() {
+                  @Override
+                  public Object invoke(Object obj, Method m, Object[] args) throws Throwable
{
+                    if ("next".equals(m.getName())) {
+                      return true;
+                    }
+                    return null;
+                  }
+            });
+            paramTypes = new Class[] { Reducer.class,
+                Configuration.class, TaskAttemptID.class,
+                rkvi,
+                Counter.class, Counter.class,
+                RecordWriter.class,
+                OutputCommitter.class,
+                Class.forName("org.apache.hadoop.mapreduce.StatusReporter"),
+                RawComparator.class,
+                Class.class, Class.class
+            };
+            args = new Object[] { instance, getConfiguration(), getTaskAttemptID(),
+                rawKeyValueIterator, null, null, null,
+                NO_OP_OUTPUT_COMMITTER, null, null,
+                NullWritable.class, NullWritable.class
+            };
+          }
+          this.handler = new CtxtMethodHandler(this.getContext());
+          this.context = (Reducer.Context) proxyFactory.create(paramTypes, args, handler);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+      
+      if (setupMethod != null) {
+        try {
+          setupMethod.invoke(instance, context);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+
+    @Override
+    public void process(Pair<K1, Iterable<V1>> input, Emitter<Pair<K2,
V2>> emitter) {
+      handler.set(emitter);
+      try {
+        reduceMethod.invoke(instance, input.first(), input.second(), context);
+      } catch (Exception e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+
+    @Override
+    public void cleanup(Emitter<Pair<K2, V2>> emitter) {
+      if (cleanupMethod != null) {
+        handler.set(emitter);
+        try {
+          cleanupMethod.invoke(instance, context);
+        } catch (Exception e) {
+          throw new CrunchRuntimeException(e);
+        }
+      }
+    }
+  }
+  
+  private static class CtxtMethodHandler implements MethodHandler {
+    public static final MethodFilter FILTER = new MethodFilter() {
+      @Override
+      public boolean isHandled(Method m) {
+        return true;
+      }
+    };
+      
+    private final TaskInputOutputContext ctxt;
+    private Emitter emitter;
+    
+    public CtxtMethodHandler(TaskInputOutputContext ctxt) {
+      this.ctxt = ctxt;
+    }
+    
+    public void set(Emitter emitter) {
+      this.emitter = emitter;
+    }
+    
+    @Override
+    public Object invoke(Object instance, Method m, Method arg2, Object[] args) throws Throwable
{
+      String name = m.getName();
+      if ("write".equals(name)) {
+        emitter.emit(Pair.of(args[0], args[1]));
+        return null;
+      } else {
+        return m.invoke(ctxt, args);
+      }
+    }
+  }
+  
+  private static final OutputCommitter NO_OP_OUTPUT_COMMITTER = new OutputCommitter() {
+    @Override
+    public void abortTask(TaskAttemptContext arg0) throws IOException {
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext arg0) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void setupJob(JobContext arg0) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext arg0) throws IOException {
+    }
+  };
+  
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java
new file mode 100644
index 0000000..f7fdf20
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/MapredTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MapredTest implements Serializable {
+
+  private static class TestMapper implements Mapper<IntWritable, Text, Text, LongWritable>
{
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void map(IntWritable k, Text v, OutputCollector<Text, LongWritable> out,
+        Reporter reporter) throws IOException {
+      reporter.getCounter("written", "out").increment(1L);
+      out.collect(new Text(v), new LongWritable(v.getLength()));
+    }
+  }
+  
+  private static class TestReducer implements Reducer<IntWritable, Text, Text, LongWritable>
{
+
+    @Override
+    public void configure(JobConf arg0) { 
+    }
+
+    @Override
+    public void close() throws IOException { 
+    }
+
+    @Override
+    public void reduce(IntWritable key, Iterator<Text> iter,
+        OutputCollector<Text, LongWritable> out, Reporter reporter) throws IOException
{
+      boolean hasBall = false;
+      String notBall = "";
+      while (iter.hasNext()) {
+        String next = iter.next().toString();
+        if ("ball".equals(next)) {
+          reporter.getCounter("foo", "bar").increment(1);
+          hasBall = true;
+        } else {
+          notBall = next;
+        }
+      }
+      out.collect(new Text(notBall), hasBall ? new LongWritable(1L) : new LongWritable(0L));
+    }
+  }
+  
+  private static Pair<Text, LongWritable> $(String one, int two) {
+    return Pair.of(new Text(one), new LongWritable(two));
+  }
+  
+  @Before
+  public void setUp() {
+    MemPipeline.clearCounters();
+  }
+  
+  @Test
+  public void testMapper() throws Exception {
+    PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(),
Avros.strings()),
+        1, "foot", 2, "ball", 3, "bazzar");
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>,
Pair<IntWritable, Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(Pair<Integer, String> input) {
+        return Pair.of(new IntWritable(input.first()), new Text(input.second()));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<Text, LongWritable> out = Mapred.map(two, TestMapper.class, Text.class,
LongWritable.class);
+    assertEquals(ImmutableList.of($("foot", 4), $("ball", 4), $("bazzar", 6)),
+        Lists.newArrayList(out.materialize()));
+  }
+  
+  @Test
+  public void testReducer() throws Exception {
+    PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(),
Avros.strings()),
+        1, "foot", 1, "ball", 2, "base", 2, "ball", 3, "basket", 3, "ball", 4, "hockey");
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>,
Pair<IntWritable, Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(Pair<Integer, String> input) {
+        return Pair.of(new IntWritable(input.first()), new Text(input.second()));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    PTable<Text, LongWritable> out = Mapred.reduce(two.groupByKey(), TestReducer.class,
Text.class, LongWritable.class);
+    assertEquals(ImmutableList.of($("foot", 1), $("base", 1), $("basket", 1), $("hockey",
0)),
+        Lists.newArrayList(out.materialize()));
+    assertEquals(3, MemPipeline.getCounters().findCounter("foo", "bar").getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3eb5f0a8/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java
new file mode 100644
index 0000000..0606690
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/MapreduceTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class MapreduceTest {
+  private static class TestMapper extends Mapper<IntWritable, Text, IntWritable, Text>
{
+    @Override
+    protected void map(IntWritable k, Text v, Mapper<IntWritable, Text, IntWritable, Text>.Context
ctxt) {
+      try {
+        ctxt.write(new IntWritable(v.getLength()), v);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  private static class TestReducer extends Reducer<IntWritable, Text, Text, LongWritable>
{
+    protected void reduce(IntWritable key, Iterable<Text> values,
+        org.apache.hadoop.mapreduce.Reducer<IntWritable, Text, Text, LongWritable>.Context
ctxt) {
+      boolean hasBall = false;
+      String notBall = "";
+      for (Text t : values) {
+        String next = t.toString();
+        if ("ball".equals(next)) {
+          hasBall = true;
+          ctxt.getCounter("foo", "bar").increment(1);
+        } else {
+          notBall = next;
+        }
+      }
+      try {
+        ctxt.write(new Text(notBall), hasBall ? new LongWritable(1L) : new LongWritable(0L));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+  
+  private static Pair<Text, LongWritable> $1(String one, int two) {
+    return Pair.of(new Text(one), new LongWritable(two));
+  }
+  
+  private static Pair<IntWritable, Text> $2(int one, String two) {
+    return Pair.of(new IntWritable(one), new Text(two));
+  }
+  
+  @Before
+  public void setUp() {
+    MemPipeline.clearCounters();
+  }
+  
+  @Test
+  public void testMapper() throws Exception {
+    PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(),
Avros.strings()),
+        1, "foot", 2, "ball", 3, "bazzar");
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>,
Pair<IntWritable, Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(Pair<Integer, String> input) {
+        return Pair.of(new IntWritable(input.first()), new Text(input.second()));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    
+    PTable<IntWritable, Text> out = Mapreduce.map(two, TestMapper.class, IntWritable.class,
Text.class);
+    assertEquals(ImmutableList.of($2(4, "foot"), $2(4, "ball"), $2(6, "bazzar")),
+        Lists.newArrayList(out.materialize()));
+  }
+  
+  @Test
+  public void testReducer() throws Exception {
+    PTable<Integer, String> in = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(),
Avros.strings()),
+        1, "foot", 1, "ball", 2, "base", 2, "ball", 3, "basket", 3, "ball", 4, "hockey");
+    PTable<IntWritable, Text> two = in.parallelDo(new MapFn<Pair<Integer, String>,
Pair<IntWritable, Text>>() {
+      @Override
+      public Pair<IntWritable, Text> map(Pair<Integer, String> input) {
+        return Pair.of(new IntWritable(input.first()), new Text(input.second()));
+      }
+    }, Writables.tableOf(Writables.writables(IntWritable.class), Writables.writables(Text.class)));
+    PTable<Text, LongWritable> out = Mapreduce.reduce(two.groupByKey(), TestReducer.class,
Text.class, LongWritable.class);
+    assertEquals(ImmutableList.of($1("foot", 1), $1("base", 1), $1("basket", 1), $1("hockey",
0)),
+        Lists.newArrayList(out.materialize()));
+    assertEquals(3, MemPipeline.getCounters().findCounter("foo", "bar").getValue());
+  }
+}


Mime
View raw message