incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [26/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:47 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
deleted file mode 100644
index 7fa61d3..0000000
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.jobcontrol;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Hashtable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
-
-/**
- * This class encapsulates a set of MapReduce jobs and its dependency.
- * 
- * It tracks the states of the jobs by placing them into different tables
- * according to their states.
- * 
- * This class provides APIs for the client app to add a job to the group and to
- * get the jobs in the group in different states. When a job is added, an ID
- * unique to the group is assigned to the job.
- * 
- * This class has a thread that submits jobs when they become ready, monitors
- * the states of the running jobs, and updates the states of jobs based on the
- * state changes of their depending jobs states. The class provides APIs for
- * suspending/resuming the thread, and for stopping the thread.
- * 
- * TODO This is mostly a copy of the JobControl class in Hadoop MapReduce core. 
- * Once the location and interface of the class are more stable in CDH, this class 
- * should be removed completely and be based on the hadoop-core class.
- */
-public class CrunchJobControl implements Runnable {
-
-  // The thread can be in one of the following state
-  public static enum ThreadState {
-    RUNNING, SUSPENDED, STOPPED, STOPPING, READY
-  };
-
-  private ThreadState runnerState; // the thread state
-
-  private Map<String, CrunchControlledJob> waitingJobs;
-  private Map<String, CrunchControlledJob> readyJobs;
-  private Map<String, CrunchControlledJob> runningJobs;
-  private Map<String, CrunchControlledJob> successfulJobs;
-  private Map<String, CrunchControlledJob> failedJobs;
-
-  private long nextJobID;
-  private String groupName;
-
-  /**
-   * Construct a job control for a group of jobs.
-   * 
-   * @param groupName
-   *          a name identifying this group
-   */
-  public CrunchJobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
-    this.successfulJobs = new Hashtable<String, CrunchControlledJob>();
-    this.failedJobs = new Hashtable<String, CrunchControlledJob>();
-    this.nextJobID = -1;
-    this.groupName = groupName;
-    this.runnerState = ThreadState.READY;
-  }
-
-  private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) {
-    ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>();
-    synchronized (jobs) {
-      for (CrunchControlledJob job : jobs.values()) {
-        retv.add(job);
-      }
-    }
-    return retv;
-  }
-
-  /**
-   * @return the jobs in the waiting state
-   */
-  public List<CrunchControlledJob> getWaitingJobList() {
-    return toList(this.waitingJobs);
-  }
-
-  /**
-   * @return the jobs in the running state
-   */
-  public List<CrunchControlledJob> getRunningJobList() {
-    return toList(this.runningJobs);
-  }
-
-  /**
-   * @return the jobs in the ready state
-   */
-  public List<CrunchControlledJob> getReadyJobsList() {
-    return toList(this.readyJobs);
-  }
-
-  /**
-   * @return the jobs in the success state
-   */
-  public List<CrunchControlledJob> getSuccessfulJobList() {
-    return toList(this.successfulJobs);
-  }
-
-  public List<CrunchControlledJob> getFailedJobList() {
-    return toList(this.failedJobs);
-  }
-
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-
-  private static void addToQueue(CrunchControlledJob aJob,
-      Map<String, CrunchControlledJob> queue) {
-    synchronized (queue) {
-      queue.put(aJob.getJobID(), aJob);
-    }
-  }
-
-  private void addToQueue(CrunchControlledJob aJob) {
-    Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState());
-    addToQueue(aJob, queue);
-  }
-
-  private Map<String, CrunchControlledJob> getQueue(State state) {
-    Map<String, CrunchControlledJob> retv = null;
-    if (state == State.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == State.READY) {
-      retv = this.readyJobs;
-    } else if (state == State.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == State.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == State.FAILED || state == State.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
-    }
-    return retv;
-  }
-
-  /**
-   * Add a new job.
-   * 
-   * @param aJob
-   *          the new job
-   */
-  synchronized public String addJob(CrunchControlledJob aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
-    aJob.setJobState(State.WAITING);
-    this.addToQueue(aJob);
-    return id;
-  }
-
-  /**
-   * Add a collection of jobs
-   * 
-   * @param jobs
-   */
-  public void addJobCollection(Collection<CrunchControlledJob> jobs) {
-    for (CrunchControlledJob job : jobs) {
-      addJob(job);
-    }
-  }
-
-  /**
-   * @return the thread state
-   */
-  public ThreadState getThreadState() {
-    return this.runnerState;
-  }
-
-  /**
-   * set the thread state to STOPPING so that the thread will stop when it wakes
-   * up.
-   */
-  public void stop() {
-    this.runnerState = ThreadState.STOPPING;
-  }
-
-  /**
-   * suspend the running thread
-   */
-  public void suspend() {
-    if (this.runnerState == ThreadState.RUNNING) {
-      this.runnerState = ThreadState.SUSPENDED;
-    }
-  }
-
-  /**
-   * resume the suspended thread
-   */
-  public void resume() {
-    if (this.runnerState == ThreadState.SUSPENDED) {
-      this.runnerState = ThreadState.RUNNING;
-    }
-  }
-
-  synchronized private void checkRunningJobs() throws IOException,
-      InterruptedException {
-
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      nextJob.checkState();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized private void checkWaitingJobs() throws IOException,
-      InterruptedException {
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      nextJob.checkState();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized private void startReadyJobs() {
-    Map<String, CrunchControlledJob> oldJobs = null;
-    oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, CrunchControlledJob>();
-
-    for (CrunchControlledJob nextJob : oldJobs.values()) {
-      // Submitting Job to Hadoop
-      nextJob.submit();
-      this.addToQueue(nextJob);
-    }
-  }
-
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0
-        && this.runningJobs.size() == 0;
-  }
-
-  /**
-   * The main loop for the thread. The loop does the following: Check the states
-   * of the running jobs Update the states of waiting jobs Submit the jobs in
-   * ready state
-   */
-  public void run() {
-    this.runnerState = ThreadState.RUNNING;
-    while (true) {
-      while (this.runnerState == ThreadState.SUSPENDED) {
-        try {
-          Thread.sleep(5000);
-        } catch (Exception e) {
-
-        }
-      }
-      try {
-        checkRunningJobs();
-        checkWaitingJobs();
-        startReadyJobs();
-      } catch (Exception e) {
-        this.runnerState = ThreadState.STOPPED;
-      }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(5000);
-      } catch (Exception e) {
-
-      }
-      if (this.runnerState != ThreadState.RUNNING
-          && this.runnerState != ThreadState.SUSPENDED) {
-        break;
-      }
-    }
-    this.runnerState = ThreadState.STOPPED;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
deleted file mode 100644
index 10d033f..0000000
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.output;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
-
-/**
- * The MultipleOutputs class simplifies writing output data 
- * to multiple outputs
- * 
- * <p> 
- * Case one: writing to additional outputs other than the job default output.
- *
- * Each additional output, or named output, may be configured with its own
- * <code>OutputFormat</code>, with its own key class and with its own value
- * class.
- * 
- * <p>
- * Case two: to write data to different files provided by user
- * </p>
- * 
- * <p>
- * MultipleOutputs supports counters, by default they are disabled. The 
- * counters group is the {@link CrunchMultipleOutputs} class name. The names of the 
- * counters are the same as the output name. These count the number records 
- * written to each output name.
- * </p>
- * 
- * Usage pattern for job submission:
- * <pre>
- *
- * Job job = new Job();
- *
- * FileInputFormat.setInputPath(job, inDir);
- * FileOutputFormat.setOutputPath(job, outDir);
- *
- * job.setMapperClass(MOMap.class);
- * job.setReducerClass(MOReduce.class);
- * ...
- *
- * // Defines additional single text based output 'text' for the job
- * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
- * LongWritable.class, Text.class);
- *
- * // Defines additional sequence-file based output 'sequence' for the job
- * MultipleOutputs.addNamedOutput(job, "seq",
- *   SequenceFileOutputFormat.class,
- *   LongWritable.class, Text.class);
- * ...
- *
- * job.waitForCompletion(true);
- * ...
- * </pre>
- * <p>
- * Usage in Reducer:
- * <pre>
- * <K, V> String generateFileName(K k, V v) {
- *   return k.toString() + "_" + v.toString();
- * }
- * 
- * public class MOReduce extends
- *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
- * private MultipleOutputs mos;
- * public void setup(Context context) {
- * ...
- * mos = new MultipleOutputs(context);
- * }
- *
- * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
- * Context context)
- * throws IOException {
- * ...
- * mos.write("text", , key, new Text("Hello"));
- * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
- * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
- * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
- * ...
- * }
- *
- * public void cleanup(Context) throws IOException {
- * mos.close();
- * ...
- * }
- *
- * }
- * </pre>
- */
-public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
-
-  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
-
-  private static final String MO_PREFIX = 
-    "mapreduce.multipleoutputs.namedOutput.";
-
-  private static final String FORMAT = ".format";
-  private static final String KEY = ".key";
-  private static final String VALUE = ".value";
-  private static final String COUNTERS_ENABLED = 
-    "mapreduce.multipleoutputs.counters";
-
-  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
-  
-  /**
-   * Counters group used by the counters of MultipleOutputs.
-   */
-  private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName();
-  
-  /**
-   * Cache for the taskContexts
-   */
-  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
-
-  /**
-   * Checks if a named output name is valid token.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkTokenName(String namedOutput) {
-    if (namedOutput == null || namedOutput.length() == 0) {
-      throw new IllegalArgumentException(
-        "Name cannot be NULL or emtpy");
-    }
-    for (char ch : namedOutput.toCharArray()) {
-      if ((ch >= 'A') && (ch <= 'Z')) {
-        continue;
-      }
-      if ((ch >= 'a') && (ch <= 'z')) {
-        continue;
-      }
-      if ((ch >= '0') && (ch <= '9')) {
-        continue;
-      }
-      throw new IllegalArgumentException(
-        "Name cannot be have a '" + ch + "' char");
-    }
-  }
-
-  /**
-   * Checks if output name is valid.
-   *
-   * name cannot be the name used for the default output
-   * @param outputPath base output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkBaseOutputPath(String outputPath) {
-    if (outputPath.equals(FileOutputFormat.PART)) {
-      throw new IllegalArgumentException("output name cannot be 'part'");
-    }
-  }
-  
-  /**
-   * Checks if a named output name is valid.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkNamedOutputName(JobContext job,
-      String namedOutput, boolean alreadyDefined) {
-    checkTokenName(namedOutput);
-    checkBaseOutputPath(namedOutput);
-    List<String> definedChannels = getNamedOutputsList(job);
-    if (alreadyDefined && definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' already alreadyDefined");
-    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' not defined");
-    }
-  }
-
-  // Returns list of channel names.
-  private static List<String> getNamedOutputsList(JobContext job) {
-    List<String> names = new ArrayList<String>();
-    StringTokenizer st = new StringTokenizer(
-      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
-    while (st.hasMoreTokens()) {
-      names.add(st.nextToken());
-    }
-    return names;
-  }
-
-  // Returns the named output OutputFormat.
-  @SuppressWarnings("unchecked")
-  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
-    JobContext job, String namedOutput) {
-    return (Class<? extends OutputFormat<?, ?>>)
-      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
-      OutputFormat.class);
-  }
-
-  // Returns the key class for a named output.
-  private static Class<?> getNamedOutputKeyClass(JobContext job,
-                                                String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
-      Object.class);
-  }
-
-  // Returns the value class for a named output.
-  private static Class<?> getNamedOutputValueClass(
-      JobContext job, String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
-      null, Object.class);
-  }
-
-  /**
-   * Adds a named output for the job.
-   * <p/>
-   *
-   * @param job               job to add the named output
-   * @param namedOutput       named output name, it has to be a word, letters
-   *                          and numbers only, cannot be the word 'part' as
-   *                          that is reserved for the default output.
-   * @param outputFormatClass OutputFormat class.
-   * @param keyClass          key class
-   * @param valueClass        value class
-   */
-  public static void addNamedOutput(Job job, String namedOutput,
-      Class<? extends OutputFormat> outputFormatClass,
-      Class<?> keyClass, Class<?> valueClass) {
-    checkNamedOutputName(job, namedOutput, true);
-    Configuration conf = job.getConfiguration();
-    conf.set(MULTIPLE_OUTPUTS,
-      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
-    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
-      OutputFormat.class);
-    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
-    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
-  }
-
-  /**
-   * Enables or disables counters for the named outputs.
-   * 
-   * The counters group is the {@link CrunchMultipleOutputs} class name.
-   * The names of the counters are the same as the named outputs. These
-   * counters count the number records written to each output name.
-   * By default these counters are disabled.
-   *
-   * @param job    job  to enable counters
-   * @param enabled indicates if the counters will be enabled or not.
-   */
-  public static void setCountersEnabled(Job job, boolean enabled) {
-    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
-  }
-
-  /**
-   * Returns if the counters for the named outputs are enabled or not.
-   * By default these counters are disabled.
-   *
-   * @param job    the job 
-   * @return TRUE if the counters are enabled, FALSE if they are disabled.
-   */
-  public static boolean getCountersEnabled(JobContext job) {
-    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
-  }
-
-  /**
-   * Wraps RecordWriter to increment counters. 
-   */
-  @SuppressWarnings("unchecked")
-  private static class RecordWriterWithCounter extends RecordWriter {
-    private RecordWriter writer;
-    private String counterName;
-    private TaskInputOutputContext context;
-
-    public RecordWriterWithCounter(RecordWriter writer, String counterName,
-                                   TaskInputOutputContext context) {
-      this.writer = writer;
-      this.counterName = counterName;
-      this.context = context;
-    }
-
-    @SuppressWarnings({"unchecked"})
-    public void write(Object key, Object value) 
-        throws IOException, InterruptedException {
-      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
-      writer.write(key, value);
-    }
-
-    public void close(TaskAttemptContext context) 
-        throws IOException, InterruptedException {
-      writer.close(context);
-    }
-  }
-
-  // instance code, to be used from Mapper/Reducer code
-
-  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
-  private Set<String> namedOutputs;
-  private Map<String, RecordWriter<?, ?>> recordWriters;
-  private boolean countersEnabled;
-  
-  /**
-   * Creates and initializes multiple outputs support,
-   * it should be instantiated in the Mapper/Reducer setup method.
-   *
-   * @param context the TaskInputOutputContext object
-   */
-  public CrunchMultipleOutputs(
-      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
-    this.context = context;
-    namedOutputs = Collections.unmodifiableSet(
-      new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
-    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
-    countersEnabled = getCountersEnabled(context);
-  }
-
-  /**
-   * Write key and value to the namedOutput.
-   *
-   * Output path is a unique file generated for the namedOutput.
-   * For example, {namedOutput}-(m|r)-{part-number}
-   * 
-   * @param namedOutput the named output name
-   * @param key         the key
-   * @param value       the value
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value)
-      throws IOException, InterruptedException {
-    write(namedOutput, key, value, namedOutput);
-  }
-
-  /**
-   * Write key and value to baseOutputPath using the namedOutput.
-   * 
-   * @param namedOutput    the named output name
-   * @param key            the key
-   * @param value          the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value,
-      String baseOutputPath) throws IOException, InterruptedException {
-    checkNamedOutputName(context, namedOutput, false);
-    checkBaseOutputPath(baseOutputPath);
-    if (!namedOutputs.contains(namedOutput)) {
-      throw new IllegalArgumentException("Undefined named output '" +
-        namedOutput + "'");
-    }
-    TaskAttemptContext taskContext = getContext(namedOutput);
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  /**
-   * Write key value to an output file name.
-   * 
-   * Gets the record writer from job's output format.  
-   * Job's output format should be a FileOutputFormat.
-   * 
-   * @param key       the key
-   * @param value     the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
-      throws IOException, InterruptedException {
-    checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
-      context.getConfiguration(), context.getTaskAttemptID());
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  // by being synchronized MultipleOutputTask can be use with a
-  // MultithreadedMapper.
-  @SuppressWarnings("unchecked")
-  private synchronized RecordWriter getRecordWriter(
-      TaskAttemptContext taskContext, String baseFileName) 
-      throws IOException, InterruptedException {
-    
-    // look for record-writer in the cache
-    RecordWriter writer = recordWriters.get(baseFileName);
-    
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName);
-      try {
-        writer = ((OutputFormat) ReflectionUtils.newInstance(
-          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
-          .getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
- 
-      // if counters are enabled, wrap the writer with context 
-      // to increment counters 
-      if (countersEnabled) {
-        writer = new RecordWriterWithCounter(writer, baseFileName, context);
-      }
-      
-      // add the record-writer to the cache
-      recordWriters.put(baseFileName, writer);
-    }
-    return writer;
-  }
-
-   // Create a taskAttemptContext for the named output with 
-   // output format and output key/value types put in the context
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-
-    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
-
-    if (taskContext != null) {
-      return taskContext;
-    }
-
-    // The following trick leverages the instantiation of a record writer via
-    // the job thus supporting arbitrary output formats.
-    Job job = new Job(context.getConfiguration());
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
-    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
-    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
-    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), context.getTaskAttemptID());
-    
-    taskContexts.put(nameOutput, taskContext);
-    
-    return taskContext;
-  }
-  
-  /**
-   * Closes all the opened outputs.
-   * 
-   * This should be called from cleanup method of map/reduce task.
-   * If overridden subclasses must invoke <code>super.close()</code> at the
-   * end of their <code>close()</code>
-   * 
-   */
-  @SuppressWarnings("unchecked")
-  public void close() throws IOException, InterruptedException {
-    for (RecordWriter writer : recordWriters.values()) {
-      writer.close(context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
deleted file mode 100644
index dc08a07..0000000
--- a/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-# ***** Set root logger level to INFO and its only appender to A.
-log4j.logger.org.apache.crunch=info, A
-
-# ***** A is set to be a ConsoleAppender.
-log4j.appender.A=org.apache.log4j.ConsoleAppender
-# ***** A uses PatternLayout.
-log4j.appender.A.layout=org.apache.log4j.PatternLayout
-log4j.appender.A.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/CollectionsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/CollectionsTest.java b/src/test/java/org/apache/crunch/CollectionsTest.java
deleted file mode 100644
index 896014a..0000000
--- a/src/test/java/org/apache/crunch/CollectionsTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("serial")
-public class CollectionsTest {
-  
-  public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> {
-    private final Collection<String> rtn = Lists.newArrayList();
-    
-    @Override
-    public void reset() {
-      rtn.clear();
-    }
-    
-    @Override
-    public void update(Collection<String> values) {
-      rtn.addAll(values);
-    }      
-    
-    @Override
-    public Iterable<Collection<String>> results() {
-      return ImmutableList.of(rtn);
-    }
-  }
-  
-  public static PTable<String, Collection<String>> listOfCharcters(PCollection<String> lines, PTypeFamily typeFamily) {
-     
-    return lines.parallelDo(new DoFn<String, Pair<String, Collection<String>>>() {
-      @Override
-      public void process(String line, Emitter<Pair<String, Collection<String>>> emitter) {
-        for (String word : line.split("\\s+")) {
-          Collection<String> characters = Lists.newArrayList();
-          for(char c : word.toCharArray()) {
-            characters.add(String.valueOf(c));
-          }
-          emitter.emit(Pair.of(word, characters));
-        }
-      }
-    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.collections(typeFamily.strings())))
-    .groupByKey()
-    .combineValues(CombineFn.<String, Collection<String>>aggregator(new AggregateStringListFn()));
-  }
-  
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(CollectionsTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(CollectionsTest.class), AvroTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryWritables() throws IOException {
-    run(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testInMemoryAvro() throws IOException {
-    run(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-  }
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Collection<String>>> lines = listOfCharcters(shakespeare, typeFamily).materialize();
-    
-    boolean passed = false;
-    for (Pair<String, Collection<String>> line : lines) {
-      if(line.first().startsWith("yellow")) {
-        passed = true;
-        break;
-      }
-    }
-    pipeline.done();
-    assertTrue(passed);
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/CombineFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/CombineFnTest.java b/src/test/java/org/apache/crunch/CombineFnTest.java
deleted file mode 100644
index e015498..0000000
--- a/src/test/java/org/apache/crunch/CombineFnTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.apache.crunch.CombineFn.MAX_BIGINTS;
-import static org.apache.crunch.CombineFn.MAX_DOUBLES;
-import static org.apache.crunch.CombineFn.MAX_FLOATS;
-import static org.apache.crunch.CombineFn.MAX_INTS;
-import static org.apache.crunch.CombineFn.MAX_LONGS;
-import static org.apache.crunch.CombineFn.MIN_BIGINTS;
-import static org.apache.crunch.CombineFn.MIN_DOUBLES;
-import static org.apache.crunch.CombineFn.MIN_FLOATS;
-import static org.apache.crunch.CombineFn.MIN_INTS;
-import static org.apache.crunch.CombineFn.MIN_LONGS;
-import static org.apache.crunch.CombineFn.SUM_BIGINTS;
-import static org.apache.crunch.CombineFn.SUM_DOUBLES;
-import static org.apache.crunch.CombineFn.SUM_FLOATS;
-import static org.apache.crunch.CombineFn.SUM_INTS;
-import static org.apache.crunch.CombineFn.SUM_LONGS;
-import static org.junit.Assert.assertEquals;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.crunch.CombineFn.Aggregator;
-import org.apache.crunch.CombineFn.AggregatorFactory;
-import org.apache.crunch.CombineFn.FirstNAggregator;
-import org.apache.crunch.CombineFn.LastNAggregator;
-import org.apache.crunch.CombineFn.MaxNAggregator;
-import org.apache.crunch.CombineFn.MinNAggregator;
-import org.apache.crunch.CombineFn.PairAggregator;
-import org.apache.crunch.CombineFn.QuadAggregator;
-import org.apache.crunch.CombineFn.TripAggregator;
-import org.apache.crunch.CombineFn.TupleNAggregator;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-
-public class CombineFnTest {
-
-  private <T> Iterable<T> applyAggregator(AggregatorFactory<T> a, Iterable<T> values) {
-    return applyAggregator(a.create(), values);
-  }
-  
-  private <T> Iterable<T> applyAggregator(Aggregator<T> a, Iterable<T> values) {
-    a.reset();
-    for (T value : values) {
-      a.update(value);
-    }
-    return a.results();
-  }
-  
-  @Test
-  public void testSums() {
-    assertEquals(ImmutableList.of(1775L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-
-    assertEquals(ImmutableList.of(1765L),
-        applyAggregator(SUM_LONGS, ImmutableList.of(29L, 7L, 1729L)));
-
-    assertEquals(ImmutableList.of(1775),
-        applyAggregator(SUM_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1775.0f),
-        applyAggregator(SUM_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1775.0),
-        applyAggregator(SUM_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("1775")),
-        applyAggregator(SUM_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-  
-  @Test
-  public void testMax() {
-    assertEquals(ImmutableList.of(1729L),
-        applyAggregator(MAX_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(1729),
-        applyAggregator(MAX_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(1729.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(1729.0),
-        applyAggregator(MAX_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(1745.0f),
-        applyAggregator(MAX_FLOATS, ImmutableList.of(29f, 1745f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(new BigInteger("1729")),
-        applyAggregator(MAX_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-  
-  @Test
-  public void testMin() {
-    assertEquals(ImmutableList.of(17L),
-        applyAggregator(MIN_LONGS, ImmutableList.of(29L, 17L, 1729L)));
-    
-    assertEquals(ImmutableList.of(17),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 17, 1729)));
-
-    assertEquals(ImmutableList.of(17.0f),
-        applyAggregator(MIN_FLOATS, ImmutableList.of(29f, 17f, 1729f)));
-
-    assertEquals(ImmutableList.of(17.0),
-        applyAggregator(MIN_DOUBLES, ImmutableList.of(29.0, 17.0, 1729.0)));
-    
-    assertEquals(ImmutableList.of(29),
-        applyAggregator(MIN_INTS, ImmutableList.of(29, 170, 1729)));
-    
-    assertEquals(ImmutableList.of(new BigInteger("17")),
-        applyAggregator(MIN_BIGINTS,
-            ImmutableList.of(new BigInteger("29"), new BigInteger("17"), new BigInteger("1729"))));
-  }
-
-  @Test
-  public void testMaxN() {
-    assertEquals(ImmutableList.of(98, 1009), applyAggregator(new MaxNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testMinN() {
-    assertEquals(ImmutableList.of(17, 29), applyAggregator(new MinNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testFirstN() {
-    assertEquals(ImmutableList.of(17, 34), applyAggregator(new FirstNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-
-  @Test
-  public void testLastN() {
-    assertEquals(ImmutableList.of(29, 1009), applyAggregator(new LastNAggregator<Integer>(2),
-        ImmutableList.of(17, 34, 98, 29, 1009)));
-  }
-  
-  @Test
-  public void testPairs() {
-    List<Pair<Long, Double>> input = ImmutableList.of(Pair.of(1720L, 17.29), Pair.of(9L, -3.14));
-    Aggregator<Pair<Long, Double>> a = new PairAggregator<Long, Double>(
-        SUM_LONGS.create(), MIN_DOUBLES.create());
-    assertEquals(Pair.of(1729L, -3.14), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testPairsTwoLongs() {
-    List<Pair<Long, Long>> input = ImmutableList.of(Pair.of(1720L, 1L), Pair.of(9L, 19L));
-    Aggregator<Pair<Long, Long>> a = new PairAggregator<Long, Long>(
-        SUM_LONGS.create(), SUM_LONGS.create());
-    assertEquals(Pair.of(1729L, 20L), Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testTrips() {
-    List<Tuple3<Float, Double, Double>> input = ImmutableList.of(
-        Tuple3.of(17.29f, 12.2, 0.1), Tuple3.of(3.0f, 1.2, 3.14), Tuple3.of(-1.0f, 14.5, -0.98));
-    Aggregator<Tuple3<Float, Double, Double>> a = new TripAggregator<Float, Double, Double>(
-        MAX_FLOATS.create(), MAX_DOUBLES.create(), MIN_DOUBLES.create());
-    assertEquals(Tuple3.of(17.29f, 14.5, -0.98),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-  
-  @Test
-  public void testQuads() {
-    List<Tuple4<Float, Double, Double, Integer>> input = ImmutableList.of(
-        Tuple4.of(17.29f, 12.2, 0.1, 1), Tuple4.of(3.0f, 1.2, 3.14, 2),
-        Tuple4.of(-1.0f, 14.5, -0.98, 3));
-    Aggregator<Tuple4<Float, Double, Double, Integer>> a =
-        new QuadAggregator<Float, Double, Double, Integer>(MAX_FLOATS.create(),
-            MAX_DOUBLES.create(), MIN_DOUBLES.create(), SUM_INTS.create());
-    assertEquals(Tuple4.of(17.29f, 14.5, -0.98, 6),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-
-  @Test
-  public void testTupleN() {
-    List<TupleN> input = ImmutableList.of(new TupleN(1, 3.0, 1, 2.0, 4L),
-        new TupleN(4, 17.0, 1, 9.7, 12L));
-    Aggregator<TupleN> a = new TupleNAggregator(MIN_INTS.create(), SUM_DOUBLES.create(),
-        MAX_INTS.create(), MIN_DOUBLES.create(), MAX_LONGS.create());
-    assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L),
-        Iterables.getOnlyElement(applyAggregator(a, input)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/FilterFnTest.java b/src/test/java/org/apache/crunch/FilterFnTest.java
deleted file mode 100644
index a173bc5..0000000
--- a/src/test/java/org/apache/crunch/FilterFnTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class FilterFnTest {
-
-  private static final FilterFn<String> TRUE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return true;
-    }
-  };
-
-  private static final FilterFn<String> FALSE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return false;
-    }
-  };
-  
-  @Test
-  public void testAnd() {
-    assertTrue(FilterFn.and(TRUE).accept("foo"));
-    assertTrue(FilterFn.and(TRUE, TRUE).accept("foo"));
-    assertFalse(FilterFn.and(TRUE, FALSE).accept("foo"));
-    assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo"));
-  }
-  
-  @Test
-  public void testOr() {
-    assertFalse(FilterFn.or(FALSE).accept("foo"));
-    assertTrue(FilterFn.or(FALSE, TRUE).accept("foo"));
-    assertTrue(FilterFn.or(TRUE, FALSE, TRUE).accept("foo"));
-    assertFalse(FilterFn.or(FALSE, FALSE, FALSE).accept("foo"));
-  }
-
-  @Test
-  public void testNot() {
-    assertFalse(FilterFn.not(TRUE).accept("foo"));
-    assertTrue(FilterFn.not(FALSE).accept("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/MapsTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/MapsTest.java b/src/test/java/org/apache/crunch/MapsTest.java
deleted file mode 100644
index 5c9eb57..0000000
--- a/src/test/java/org/apache/crunch/MapsTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.util.Map;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
-public class MapsTest {
-
-  @Test
-  public void testWritables() throws Exception {
-	run(WritableTypeFamily.getInstance());
-  }
-  
-  @Test
-  public void testAvros() throws Exception {
-	run(AvroTypeFamily.getInstance());
-  }
-  
-  public static void run(PTypeFamily typeFamily) throws Exception {
-	Pipeline pipeline = new MRPipeline(MapsTest.class);
-    String shakesInputPath = FileHelper.createTempCopyOf("shakes.txt");
-    PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
-    Iterable<Pair<String, Map<String, Long>>> output = shakespeare.parallelDo(
-      new DoFn<String, Pair<String, Map<String, Long>>>() {
-	    @Override
-	    public void process(String input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  String last = null;
-		  for (String word : input.toLowerCase().split("\\W+")) {
-		    if (!word.isEmpty()) {
-			  String firstChar = word.substring(0, 1);
-		      if (last != null) {
-		    	Map<String, Long> cc = ImmutableMap.of(firstChar, 1L);
-			    emitter.emit(Pair.of(last, cc));
-		      }
-		      last = firstChar;
-		    }
-		  }
-	    }
-      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.maps(typeFamily.longs())))
-      .groupByKey()
-      .combineValues(new CombineFn<String, Map<String, Long>>() {
-	    @Override
-	    public void process(Pair<String, Iterable<Map<String, Long>>> input,
-		    Emitter<Pair<String, Map<String, Long>>> emitter) {
-		  Map<String, Long> agg = Maps.newHashMap();
-		  for (Map<String, Long> in : input.second()) {
-		    for (Map.Entry<String, Long> e : in.entrySet()) {
-			  if (!agg.containsKey(e.getKey())) {
-			    agg.put(e.getKey(), e.getValue());
-			  } else {
-			    agg.put(e.getKey(), e.getValue() + agg.get(e.getKey()));
-			  }
-		    }
-		  }
-		  emitter.emit(Pair.of(input.first(), agg));
-	    }
-	  }).materialize();
-    boolean passed = false;
-    for (Pair<String, Map<String, Long>> v : output) {
-      if (v.first() == "k" && v.second().get("n") == 8L) {
-    	passed = true;
-    	break;
-      }
-    }
-    pipeline.done();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/MaterializeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/MaterializeTest.java b/src/test/java/org/apache/crunch/MaterializeTest.java
deleted file mode 100644
index f6b8ea2..0000000
--- a/src/test/java/org/apache/crunch/MaterializeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-public class MaterializeTest {
-
-	/** Filter that rejects everything. */
-	@SuppressWarnings("serial")
-	private static class FalseFilterFn extends FilterFn<String> {
-
-		@Override
-		public boolean accept(final String input) {
-			return false;
-		}
-	}
-
-	@Test
-	public void testMaterializeInput_Writables() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeTest.class), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_Avro() throws IOException {
-		runMaterializeInput(new MRPipeline(MaterializeTest.class), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryWritables() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeInput_InMemoryAvro() throws IOException {
-		runMaterializeInput(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Writables() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
-				WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_Avro() throws IOException {
-		runMaterializeEmptyIntermediate(new MRPipeline(MaterializeTest.class),
-				AvroTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryWritables() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), WritableTypeFamily.getInstance());
-	}
-
-	@Test
-	public void testMaterializeEmptyIntermediate_InMemoryAvro() throws IOException {
-		runMaterializeEmptyIntermediate(MemPipeline.getInstance(), AvroTypeFamily.getInstance());
-	}
-
-	public void runMaterializeInput(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-		List<String> expectedContent = Lists.newArrayList("b", "c", "a", "e");
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-
-		PCollection<String> lines = pipeline.readTextFile(inputPath);
-		assertEquals(expectedContent, Lists.newArrayList(lines.materialize()));
-		pipeline.done();
-	}
-
-	public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily typeFamily)
-			throws IOException {
-		String inputPath = FileHelper.createTempCopyOf("set1.txt");
-		PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new FalseFilterFn());
-
-		assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
-		pipeline.done();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/MaterializeToMapTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/MaterializeToMapTest.java b/src/test/java/org/apache/crunch/MaterializeToMapTest.java
deleted file mode 100644
index 5c92019..0000000
--- a/src/test/java/org/apache/crunch/MaterializeToMapTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static junit.framework.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.common.collect.ImmutableList;
-
-public class MaterializeToMapTest {
-  
-  static final ImmutableList<Pair<Integer,String>> kvPairs = 
-      ImmutableList.of(
-          Pair.of(0, "a"),
-          Pair.of(1, "b"),
-          Pair.of(2, "c"),
-          Pair.of(3, "e"));
-  
-  public void assertMatches(Map<Integer,String> m) {
-    for (Integer k : m.keySet()) {
-      System.out.println(k + " " + kvPairs.get(k).second() + " " + m.get(k));
-      assertTrue(kvPairs.get(k).second().equals(m.get(k)));
-    }
-  }
-  
-  @Test
-  public void testMemMaterializeToMap() {
-    assertMatches(MemPipeline.tableOf(kvPairs).materializeToMap());
-  }
-  
-  private static class Set1Mapper extends MapFn<String,Pair<Integer,String>> {
-    @Override
-    public Pair<Integer, String> map(String input) {
-      
-      int k = -1;
-      if (input.equals("a")) k = 0;
-      else if (input.equals("b")) k = 1;
-      else if (input.equals("c")) k = 2;
-      else if (input.equals("e")) k = 3;
-      return Pair.of(k, input);
-    }
-  }
-  
-  @Test
-  public void testMRMaterializeToMap() throws IOException {
-    Pipeline p = new MRPipeline(MaterializeToMapTest.class);
-    String inputFile = FileHelper.createTempCopyOf("set1.txt");
-    PCollection<String> c = p.readTextFile(inputFile);
-    PTypeFamily tf = c.getTypeFamily();
-    PTable<Integer,String> t = c.parallelDo(new Set1Mapper(), tf.tableOf(tf.ints(), tf.strings()));
-    Map<Integer, String> m = t.materializeToMap();
-    assertMatches(m);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/MultipleOutputTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/MultipleOutputTest.java b/src/test/java/org/apache/crunch/MultipleOutputTest.java
deleted file mode 100644
index ad78fda..0000000
--- a/src/test/java/org/apache/crunch/MultipleOutputTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.io.Files;
-
-public class MultipleOutputTest {
-  
-  public static PCollection<String> evenCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-    return words.parallelDo("even", new FilterFn<String>(){
-
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 == 0;
-        }}, typeFamily.strings());
-  }
-  
-  public static PCollection<String> oddCountLetters(PCollection<String> words, PTypeFamily typeFamily) {
-      return words.parallelDo("odd", new FilterFn<String>(){
-
-        @Override
-        public boolean accept(String input) {
-            return input.length() % 2 != 0;
-        }}, typeFamily.strings());
-       
-    }
-  
-  public static PTable<String, Long> substr(PTable<String, Long> ptable) {
-	return ptable.parallelDo(new DoFn<Pair<String, Long>, Pair<String, Long>>() {
-	  public void process(Pair<String, Long> input,
-		  Emitter<Pair<String, Long>> emitter) {
-		if (input.first().length() > 0) {
-		  emitter.emit(Pair.of(input.first().substring(0, 1), input.second()));
-		}
-	  }      
-    }, ptable.getPTableType());
-  }
-  
-  @Test
-  public void testWritables() throws IOException {
-    run(new MRPipeline(MultipleOutputTest.class), WritableTypeFamily.getInstance());
-  }
-
-  @Test
-  public void testAvro() throws IOException {
-    run(new MRPipeline(MultipleOutputTest.class), AvroTypeFamily.getInstance());
-  }
- 
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException {
-	String inputPath = FileHelper.createTempCopyOf("letters.txt");
-	File outputEven = FileHelper.createOutputPath();
-	File outputOdd = FileHelper.createOutputPath();
-	String outputPathEven = outputEven.getAbsolutePath();
-	String outputPathOdd = outputOdd.getAbsolutePath();
-	
-    PCollection<String> words = pipeline.read(
-         At.textFile(inputPath, typeFamily.strings()));
-    
-    PCollection<String> evenCountWords = evenCountLetters(words, typeFamily);
-    PCollection<String> oddCountWords = oddCountLetters(words, typeFamily);
-    pipeline.writeTextFile(evenCountWords, outputPathEven);
-    pipeline.writeTextFile(oddCountWords, outputPathOdd);
-    
-    pipeline.done();
-   
-    checkFileContents(outputPathEven, Arrays.asList("bb"));
-    checkFileContents(outputPathOdd, Arrays.asList("a"));
-   
-	outputEven.deleteOnExit();
-	outputOdd.deleteOnExit();
-  }  
-  
-  private void checkFileContents(String filePath, List<String> expected) throws IOException{
-    File outputFile = new File(filePath, "part-m-00000");
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    assertEquals(expected, lines);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java b/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
deleted file mode 100644
index eee3cd5..0000000
--- a/src/test/java/org/apache/crunch/PCollectionGetSizeTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.apache.crunch.io.At.sequenceFile;
-import static org.apache.crunch.io.At.textFile;
-import static org.apache.crunch.types.writable.Writables.strings;
-import static com.google.common.collect.Lists.newArrayList;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.IOException;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.test.FileHelper;
-
-public class PCollectionGetSizeTest {
-
-    private String emptyInputPath;
-    private String nonEmptyInputPath;
-    private String outputPath;
-
-    /** Filter that rejects everything. */
-    @SuppressWarnings("serial")
-    private static class FalseFilterFn extends FilterFn<String> {
-
-        @Override
-        public boolean accept(final String input) {
-            return false;
-        }
-    }
-
-    @Before
-    public void setUp() throws IOException {
-        emptyInputPath = FileHelper.createTempCopyOf("emptyTextFile.txt");
-        nonEmptyInputPath = FileHelper.createTempCopyOf("set1.txt");
-        outputPath = FileHelper.createOutputPath().getAbsolutePath();
-    }
-
-    @Test
-    public void testGetSizeOfEmptyInput_MRPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(new MRPipeline(this.getClass()));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyInput_MemPipeline() throws IOException {
-        testCollectionGetSizeOfEmptyInput(MemPipeline.getInstance());
-    }
-
-    private void testCollectionGetSizeOfEmptyInput(Pipeline pipeline) throws IOException {
-
-        assertThat(pipeline.read(textFile(emptyInputPath)).getSize(), is(0L));
-    }
-
-    @Test
-    public void testMaterializeEmptyInput_MRPipeline() throws IOException {
-        testMaterializeEmptyInput(new MRPipeline(this.getClass()));
-    }
-
-    @Test
-    public void testMaterializeEmptyImput_MemPipeline() throws IOException {
-        testMaterializeEmptyInput(MemPipeline.getInstance());
-    }
-
-    private void testMaterializeEmptyInput(Pipeline pipeline) throws IOException {
-        assertThat(newArrayList(pipeline.readTextFile(emptyInputPath).materialize().iterator()).size(), is(0));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
-
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
-
-    @Test
-    @Ignore("GetSize of a DoCollection is only an estimate based on scale factor, so we can't count on it being reported as 0")
-    public void testGetSizeOfEmptyIntermediatePCollection_NoSave_MRPipeline() throws IOException {
-
-        PCollection<String> data = new MRPipeline(this.getClass()).readTextFile(nonEmptyInputPath);
-
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
-
-        assertThat(emptyPCollection.getSize(), is(0L));
-    }
-
-    @Test
-    public void testGetSizeOfEmptyIntermediatePCollection_MemPipeline() {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-        assertThat(emptyIntermediate.getSize(), is(0L));
-    }
-
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MRPipeline() throws IOException {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(new MRPipeline(this.getClass()));
-
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
-
-    @Test
-    public void testMaterializeOfEmptyIntermediatePCollection_MemPipeline() {
-
-        PCollection<String> emptyIntermediate = createPesistentEmptyIntermediate(MemPipeline.getInstance());
-
-        assertThat(newArrayList(emptyIntermediate.materialize()).size(), is(0));
-    }
-
-    private PCollection<String> createPesistentEmptyIntermediate(Pipeline pipeline) {
-
-        PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
-
-        PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
-
-        emptyPCollection.write(sequenceFile(outputPath, strings()));
-
-        pipeline.run();
-
-        return pipeline.read(sequenceFile(outputPath, strings()));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MRPipeline() throws IOException {
-        new MRPipeline(this.getClass()).readTextFile("non_existing.file").getSize();
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testExpectExceptionForGettingSizeOfNonExistingFile_MemPipeline() {
-        MemPipeline.getInstance().readTextFile("non_existing.file").getSize();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/PTableKeyValueTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/PTableKeyValueTest.java b/src/test/java/org/apache/crunch/PTableKeyValueTest.java
deleted file mode 100644
index eadff17..0000000
--- a/src/test/java/org/apache/crunch/PTableKeyValueTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-
-import junit.framework.Assert;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.At;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-
-@RunWith(value = Parameterized.class)
-public class PTableKeyValueTest implements Serializable {
-
-	private static final long serialVersionUID = 4374227704751746689L;
-
-	private transient PTypeFamily typeFamily;
-	private transient MRPipeline pipeline;
-	private transient String inputFile;
-
-	@Before
-	public void setUp() throws IOException {
-		pipeline = new MRPipeline(PTableKeyValueTest.class);
-		inputFile = FileHelper.createTempCopyOf("set1.txt");
-	}
-
-	@After
-	public void tearDown() {
-		pipeline.done();
-	}
-
-	public PTableKeyValueTest(PTypeFamily typeFamily) {
-		this.typeFamily = typeFamily;
-	}
-
-	@Parameters
-	public static Collection<Object[]> data() {
-		Object[][] data = new Object[][] {
-				{ WritableTypeFamily.getInstance() },
-				{ AvroTypeFamily.getInstance() } };
-		return Arrays.asList(data);
-	}
-
-	@Test
-	public void testKeysAndValues() throws Exception {
-
-		PCollection<String> collection = pipeline.read(At.textFile(inputFile,
-				typeFamily.strings()));
-
-		PTable<String, String> table = collection.parallelDo(
-				new DoFn<String, Pair<String, String>>() {
-
-					@Override
-					public void process(String input,
-							Emitter<Pair<String, String>> emitter) {
-						emitter.emit(Pair.of(input.toUpperCase(), input));
-
-					}
-				}, typeFamily.tableOf(typeFamily.strings(),
-						typeFamily.strings()));
-
-		PCollection<String> keys = table.keys();
-		PCollection<String> values = table.values();
-
-		ArrayList<String> keyList = Lists.newArrayList(keys.materialize()
-				.iterator());
-		ArrayList<String> valueList = Lists.newArrayList(values.materialize()
-				.iterator());
-
-		Assert.assertEquals(keyList.size(), valueList.size());
-		for (int i = 0; i < keyList.size(); i++) {
-			Assert.assertEquals(keyList.get(i), valueList.get(i).toUpperCase());
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/PageRankTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/PageRankTest.java b/src/test/java/org/apache/crunch/PageRankTest.java
deleted file mode 100644
index 5edc5d6..0000000
--- a/src/test/java/org/apache/crunch/PageRankTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-
-import org.apache.crunch.impl.mem.MemPipeline;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.test.FileHelper;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroTypeFamily;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import org.apache.crunch.util.PTypes;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-public class PageRankTest {
-
-  public static class PageRankData {
-	public float score;
-	public float lastScore;
-	public List<String> urls;
-	
-	public PageRankData() { }
-	
-	public PageRankData(float score, float lastScore, Iterable<String> urls) {
-	  this.score = score;
-	  this.lastScore = lastScore;
-	  this.urls = Lists.newArrayList(urls);
-	}
-	
-	public PageRankData next(float newScore) {
-	  return new PageRankData(newScore, score, urls);
-	}
-	
-	public float propagatedScore() {
-	  return score / urls.size();
-	}
-	
-	@Override
-	public String toString() {
-	  return score + " " + lastScore + " " + urls;
-	}
-  }
-  
-  @Test public void testAvroReflect() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(new MRPipeline(PageRankTest.class), prType, tf);	
-  }
-  
-  @Test public void testAvroMReflectInMemory() throws Exception {
-    PTypeFamily tf = AvroTypeFamily.getInstance();
-    PType<PageRankData> prType = Avros.reflects(PageRankData.class);
-    run(MemPipeline.getInstance(), prType, tf);        
-  }
-  
-  @Test public void testAvroJSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-
-  @Test public void testAvroBSON() throws Exception {
-	PTypeFamily tf = AvroTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-  
-  @Test public void testWritablesJSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.jsonString(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-
-  @Test public void testWritablesBSON() throws Exception {
-	PTypeFamily tf = WritableTypeFamily.getInstance();
-	PType<PageRankData> prType = PTypes.smile(PageRankData.class, tf);
-    run(new MRPipeline(PageRankTest.class), prType, tf);
-  }
-  
-  public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData> input, final float d) {
-    PTypeFamily ptf = input.getTypeFamily();
-    PTable<String, Float> outbound = input.parallelDo(
-        new DoFn<Pair<String, PageRankData>, Pair<String, Float>>() {
-          @Override
-          public void process(Pair<String, PageRankData> input, Emitter<Pair<String, Float>> emitter) {
-            PageRankData prd = input.second();
-            for (String link : prd.urls) {
-              emitter.emit(Pair.of(link, prd.propagatedScore()));
-            }
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.floats()));
-    
-    return input.cogroup(outbound).parallelDo(
-        new MapFn<Pair<String, Pair<Collection<PageRankData>, Collection<Float>>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Pair<Collection<PageRankData>, Collection<Float>>> input) {
-                PageRankData prd = Iterables.getOnlyElement(input.second().first());
-                Collection<Float> propagatedScores = input.second().second();
-                float sum = 0.0f;
-                for (Float s : propagatedScores) {
-                  sum += s;
-                }
-                return Pair.of(input.first(), prd.next(d + (1.0f - d)*sum));
-              }
-            }, input.getPTableType());
-  }
-  
-  public static void run(Pipeline pipeline, PType<PageRankData> prType, PTypeFamily ptf) throws Exception {
-    String urlInput = FileHelper.createTempCopyOf("urls.txt");
-    PTable<String, PageRankData> scores = pipeline.readTextFile(urlInput)
-        .parallelDo(new MapFn<String, Pair<String, String>>() {
-          @Override
-          public Pair<String, String> map(String input) {
-            String[] urls = input.split("\\t");
-            return Pair.of(urls[0], urls[1]);
-          }
-        }, ptf.tableOf(ptf.strings(), ptf.strings()))
-        .groupByKey()
-        .parallelDo(new MapFn<Pair<String, Iterable<String>>, Pair<String, PageRankData>>() {
-              @Override
-              public Pair<String, PageRankData> map(Pair<String, Iterable<String>> input) {
-                return Pair.of(input.first(), new PageRankData(1.0f, 0.0f, input.second()));
-              }
-            }, ptf.tableOf(ptf.strings(), prType));
-    
-    Float delta = 1.0f;
-    while (delta > 0.01) {
-      scores = pageRank(scores, 0.5f);
-      scores.materialize().iterator(); // force the write
-      delta = Iterables.getFirst(Aggregate.max(
-          scores.parallelDo(new MapFn<Pair<String, PageRankData>, Float>() {
-            @Override
-            public Float map(Pair<String, PageRankData> input) {
-              PageRankData prd = input.second();
-              return Math.abs(prd.score - prd.lastScore);
-            }
-          }, ptf.floats())).materialize(), null);
-    }
-    assertEquals(0.0048, delta, 0.001);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/PairTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/PairTest.java b/src/test/java/org/apache/crunch/PairTest.java
deleted file mode 100644
index eff183b..0000000
--- a/src/test/java/org/apache/crunch/PairTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-
-public class PairTest {
-  
-  @Test
-  public void testPairConstructor() {
-    Pair<String, Integer> pair = new Pair<String, Integer>("brock", 45);
-    test(pair);
-  }
-
-  @Test
-  public void testPairOf() {
-    Pair<String, Integer> pair = Pair.of("brock", 45);
-    test(pair);
-  }
-
-  protected void test(Pair<String, Integer> pair) {
-    assertTrue(pair.size() == 2);
-    
-    assertEquals("brock", pair.first());
-    assertEquals(new Integer(45), pair.second());
-    assertEquals(Pair.of("brock", 45), pair);
-    
-    assertEquals("brock", pair.get(0));
-    assertEquals(new Integer(45), pair.get(1));
-
-    try {
-      pair.get(-1);
-      fail();
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-  }
-  
-  @Test
-  public void testPairComparisons() {
-    assertEquals(0, Pair.of(null, null).compareTo(Pair.of(null, null)));
-    assertEquals(0, Pair.of(1, 2).compareTo(Pair.of(1, 2)));
-    assertTrue(Pair.of(2, "a").compareTo(Pair.of(1, "a")) > 0);
-    assertTrue(Pair.of("a", 2).compareTo(Pair.of("a", 1)) > 0);
-    assertTrue(Pair.of(null, 17).compareTo(Pair.of(null, 29)) < 0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/test/java/org/apache/crunch/TFIDFTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/crunch/TFIDFTest.java b/src/test/java/org/apache/crunch/TFIDFTest.java
deleted file mode 100644
index d22bf06..0000000
--- a/src/test/java/org/apache/crunch/TFIDFTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static com.google.common.io.Resources.getResource;
-import static com.google.common.io.Resources.newInputStreamSupplier;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import org.apache.crunch.fn.MapKeysFn;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Join;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.writable.WritableTypeFamily;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-@SuppressWarnings("serial")
-public class TFIDFTest implements Serializable {  
-  // total number of documents, should calculate
-  protected static final double N = 2;
-  
-  @Test
-  public void testWritablesSingleRun() throws IOException {
-    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), true);
-  }
-
-  @Test
-  public void testWritablesMultiRun() throws IOException {
-    run(new MRPipeline(TFIDFTest.class), WritableTypeFamily.getInstance(), false);
-  }
-
-  /**
-   * This method should generate a TF-IDF score for the input.
-   */
-  public PTable<String, Collection<Pair<String, Double>>>  generateTFIDF(PCollection<String> docs,
-      Path termFreqPath, PTypeFamily ptf) throws IOException {    
-    
-    /*
-     * Input: String
-     * Input title  text
-     * 
-     * Output: PTable<Pair<String, String>, Long> 
-     * Pair<Pair<word, title>, count in title>
-     */
-    PTable<Pair<String, String>, Long> tf = Aggregate.count(docs.parallelDo("term document frequency",
-        new DoFn<String, Pair<String, String>>() {
-      @Override
-      public void process(String doc, Emitter<Pair<String, String>> emitter) {
-        String[] kv = doc.split("\t");
-        String title = kv[0];
-        String text = kv[1];
-        for (String word : text.split("\\W+")) {
-          if(word.length() > 0) {
-            Pair<String, String> pair = Pair.of(word.toLowerCase(), title);
-            emitter.emit(pair);
-          }
-        }
-      }
-    }, ptf.pairs(ptf.strings(), ptf.strings())));
-    
-    tf.write(new SeqFileSourceTarget<Pair<Pair<String, String>, Long>>(termFreqPath, tf.getPType()));
-    
-    /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
-     * 
-     * Output: PTable<String, Long>
-     * PTable<word, # of docs containing word>
-     */
-    PTable<String, Long> n = Aggregate.count(tf.parallelDo("little n (# of docs contain word)",  
-        new DoFn<Pair<Pair<String, String>, Long>, String>() {
-      @Override
-      public void process(Pair<Pair<String, String>, Long> input,
-          Emitter<String> emitter) {
-        emitter.emit(input.first().first());
-      }
-    }, ptf.strings()));
-    
-    /*
-     * Input: Pair<Pair<String, String>, Long>
-     * Pair<Pair<word, title>, count in title>
-     * 
-     * Output: PTable<String, Pair<String, Long>>
-     * PTable<word, Pair<title, count in title>>
-     */
-    PTable<String, Collection<Pair<String, Long>>> wordDocumentCountPair = tf.parallelDo("transform wordDocumentPairCount",
-        new DoFn<Pair<Pair<String, String>, Long>, Pair<String, Collection<Pair<String, Long>>>>() {
-          Collection<Pair<String, Long>> buffer;
-          String key;
-          @Override
-          public void process(Pair<Pair<String, String>, Long> input,
-        	  Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            Pair<String, String> wordDocumentPair = input.first();
-            if(!wordDocumentPair.first().equals(key)) {
-              flush(emitter);
-              key = wordDocumentPair.first();
-              buffer = Lists.newArrayList();
-            }
-            buffer.add(Pair.of(wordDocumentPair.second(), input.second()));            
-          }
-          protected void flush(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            if(buffer != null) {
-              emitter.emit(Pair.of(key, buffer));
-              buffer = null;
-            }
-          }
-          @Override
-          public void cleanup(Emitter<Pair<String, Collection<Pair<String, Long>>>> emitter) {
-            flush(emitter);
-          }
-      }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.longs()))));
-
-    PTable<String, Pair<Long, Collection<Pair<String, Long>>>> joinedResults = Join.join(n, wordDocumentCountPair);
-
-    /*
-     * Input: Pair<String, Pair<Long, Collection<Pair<String, Long>>>
-     * Pair<word, Pair<# of docs containing word, Collection<Pair<title, term frequency>>>
-     * 
-     * Output: Pair<String, Collection<Pair<String, Double>>>
-     * Pair<word, Collection<Pair<title, tfidf>>>
-     */
-    return joinedResults.parallelDo("calculate tfidf",
-        new MapFn<Pair<String, Pair<Long, Collection<Pair<String, Long>>>>, Pair<String, Collection<Pair<String, Double>>>>() {
-          @Override
-          public Pair<String, Collection<Pair<String, Double>>> map(Pair<String, Pair<Long, Collection<Pair<String, Long>>>> input) {
-            Collection<Pair<String, Double>> tfidfs = Lists.newArrayList();
-            String word = input.first();
-            double n = input.second().first();
-            double idf = Math.log(N / n);
-            for(Pair<String, Long> tf : input.second().second()) {
-              double tfidf = tf.second() * idf;
-              tfidfs.add(Pair.of(tf.first(), tfidf));
-            }
-            return Pair.of(word, tfidfs);
-          }
-      
-    }, ptf.tableOf(ptf.strings(), ptf.collections(ptf.pairs(ptf.strings(), ptf.doubles()))));
-  }
-  
-  public void run(Pipeline pipeline, PTypeFamily typeFamily, boolean singleRun) throws IOException {
-    File input = File.createTempFile("docs", "txt");
-    input.deleteOnExit();
-    Files.copy(newInputStreamSupplier(getResource("docs.txt")), input);
-    
-    String outputPath1 = getOutput();
-    String outputPath2 = getOutput();
-    
-    Path tfPath = new Path(getOutput("termfreq"));
-    
-    PCollection<String> docs = pipeline.readTextFile(input.getAbsolutePath());
-        
-    PTable<String, Collection<Pair<String, Double>>> results =
-        generateTFIDF(docs, tfPath, typeFamily);
-    pipeline.writeTextFile(results, outputPath1);
-    if (!singleRun) {
-      pipeline.run();
-    }
-    
-    PTable<String, Collection<Pair<String, Double>>> uppercased = results.parallelDo(
-        new MapKeysFn<String, String, Collection<Pair<String, Double>>>() {
-          @Override
-          public String map(String k1) {
-            return k1.toUpperCase();
-          } 
-        }, results.getPTableType());
-    pipeline.writeTextFile(uppercased, outputPath2);
-    pipeline.done();
-    
-    // Check the lowercase version...
-    File outputFile = new File(outputPath1, "part-r-00000");
-    outputFile.deleteOnExit();
-    List<String> lines = Files.readLines(outputFile, Charset.defaultCharset());
-    boolean passed = false;
-    for (String line : lines) {
-      if (line.startsWith("the") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-    
-    // ...and the uppercase version
-    outputFile = new File(outputPath2, "part-r-00000");
-    outputFile.deleteOnExit();
-    lines = Files.readLines(outputFile, Charset.defaultCharset());
-    passed = false;
-    for (String line : lines) {
-      if (line.startsWith("THE") && line.contains("B,0.6931471805599453")) {
-        passed = true;
-        break;
-      }
-    }
-    assertTrue(passed);
-  }
-  
-  public static String getOutput() throws IOException {
-    return getOutput("output");
-  }
-  
-  public static String getOutput(String prefix) throws IOException {
-    File output = File.createTempFile(prefix, "");
-    String path = output.getAbsolutePath();
-    output.delete();
-    return path;
-  }
-}


Mime
View raw message