incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [13/28] Rename com.cloudera.crunch -> org.apache.crunch in the Java core
Date Sat, 07 Jul 2012 21:49:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java b/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
new file mode 100644
index 0000000..34f86e2
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/BZip2TextInputFormat.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+public class BZip2TextInputFormat extends FileInputFormat<LongWritable, Text> {
+  /**
+   * Treats keys as offset in file and value as line. Since the input file is
+   * compressed, the offset for a particular line is not well-defined. This
+   * implementation returns the starting position of a compressed block as the
+   * key for every line in that block.
+   */
+
+  private static class BZip2LineRecordReader extends RecordReader<LongWritable, Text> {
+
+    private long start;
+
+    private long end;
+
+    private long pos;
+
+    private CBZip2InputStream in;
+
+    private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+
+    // flag to indicate if previous character read was Carriage Return ('\r')
+    // and the next character was not Line Feed ('\n')
+    private boolean CRFollowedByNonLF = false;
+
+    // in the case where a Carriage Return ('\r') was not followed by a 
+    // Line Feed ('\n'), this variable will hold that non Line Feed character
+    // that was read from the underlying stream.
+    private byte nonLFChar;
+
+
+    /**
+     * Provide a bridge to get the bytes from the ByteArrayOutputStream without
+     * creating a new byte array.
+     */
+    private static class TextStuffer extends OutputStream {
+      public Text target;
+
+      @Override
+      public void write(int b) {
+        throw new UnsupportedOperationException("write(byte) not supported");
+      }
+
+      @Override
+      public void write(byte[] data, int offset, int len) throws IOException {
+        target.clear();
+        target.set(data, offset, len);
+      }
+    }
+
+    private TextStuffer bridge = new TextStuffer();
+
+    private LongWritable key = new LongWritable();
+    private Text value = new Text();
+
+    public BZip2LineRecordReader(Configuration job, FileSplit split) throws IOException {
+      start = split.getStart();
+      end = start + split.getLength();
+      final Path file = split.getPath();
+
+      // open the file and seek to the start of the split
+      FileSystem fs = file.getFileSystem(job);
+      FSDataInputStream fileIn = fs.open(split.getPath());
+      fileIn.seek(start);
+
+      in = new CBZip2InputStream(fileIn, 9, end);
+      if (start != 0) {
+        // skip first line and re-establish "start".
+        // LineRecordReader.readLine(this.in, null);
+        readLine(this.in, null);
+        start = in.getPos();
+      }
+      pos = in.getPos();
+    }
+
+    /*
+     * LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
+     * locally.
+     */
+    private long readLine(InputStream in, 
+        OutputStream out) throws IOException {
+      long bytes = 0;
+      while (true) {
+        int b = -1;
+        if(CRFollowedByNonLF) {
+          // In the previous call, a Carriage Return ('\r') was followed
+          // by a non Line Feed ('\n') character - in that call we would
+          // have not returned the non Line Feed character but would have
+          // read it from the stream - lets use that already read character
+          // now
+          b = nonLFChar;
+          CRFollowedByNonLF = false;
+        } else {
+          b = in.read();
+        }
+        if (b == -1) {
+          break;
+        }
+        bytes += 1;
+
+        byte c = (byte)b;
+        if (c == '\n') {
+          break;
+        }
+
+        if (c == '\r') {
+          byte nextC = (byte)in.read();
+          if (nextC != '\n') {
+            CRFollowedByNonLF = true;
+            nonLFChar = nextC;
+          } else {
+            bytes += 1;
+          }
+          break;
+        }
+
+        if (out != null) {
+          out.write(c);
+        }
+      }
+      return bytes;
+    }
+
+    /** Read a line. */
+    public  boolean next(LongWritable key, Text value)
+        throws IOException {
+      if (pos > end)
+        return false;
+
+      key.set(pos); // key is position
+      buffer.reset();
+      // long bytesRead = LineRecordReader.readLine(in, buffer); 
+      long bytesRead = readLine(in, buffer);
+      if (bytesRead == 0) {
+        return false;
+      }
+      pos = in.getPos();
+      // if we have read ahead because we encountered a carriage return
+      // char followed by a non line feed char, decrement the pos
+      if(CRFollowedByNonLF) {
+        pos--;
+      }
+
+      bridge.target = value;
+      buffer.writeTo(bridge);
+      return true;
+    }
+
+    /**
+     * Get the progress within the split
+     */
+    @Override
+    public float getProgress() {
+      if (start == end) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, (pos - start) / (float) (end - start));
+      }
+    }
+
+    @Override
+    public  void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException,
+    InterruptedException {
+      return key;
+    }
+
+    @Override
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      // no op        
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return next(key, value);
+    }
+
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file)  {
+    return true;  
+  }
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
+      TaskAttemptContext context) {
+    try {
+      return new BZip2LineRecordReader(context.getConfiguration(), 
+          (FileSplit) split);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java b/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
new file mode 100644
index 0000000..f5533a0
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/CBZip2InputStream.java
@@ -0,0 +1,1025 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * An input stream that decompresses from the BZip2 format (without the file
+ * header chars) to be read as any other stream.
+ *
+ * @author <a href="mailto:keiron@aftexsw.com">Keiron Liddle</a>
+ */
+public class CBZip2InputStream extends InputStream implements BZip2Constants {
+  private static void cadvise(String reason) throws IOException {
+    throw new IOException(reason);
+  }
+
+  private static void compressedStreamEOF() throws IOException {
+    cadvise("compressedStream EOF");
+  }
+
+  private void makeMaps() {
+    int i;
+    nInUse = 0;
+    for (i = 0; i < 256; i++) {
+      if (inUse[i]) {
+        seqToUnseq[nInUse] = (char) i;
+        unseqToSeq[i] = (char) nInUse;
+        nInUse++;
+      }
+    }
+  }
+
+  /*
+      index of the last char in the block, so
+      the block size == last + 1.
+   */
+  private int  last;
+
+  /*
+      index in zptr[] of original string after sorting.
+   */
+  private int  origPtr;
+
+  /*
+      always: in the range 0 .. 9.
+      The current block size is 100000 * this number.
+   */
+  private int blockSize100k;
+
+  private boolean blockRandomised;
+
+  // a buffer to keep the read byte
+  private int bsBuff;
+
+  // since bzip is bit-aligned at block boundaries there can be a case wherein
+  // only few bits out of a read byte are consumed and the remaining bits
+  // need to be consumed while processing the next block.
+  // indicate how many bits in bsBuff have not been processed yet
+  private int bsLive;
+  private CRC mCrc = new CRC();
+
+  private boolean[] inUse = new boolean[256];
+  private int nInUse;
+
+  private char[] seqToUnseq = new char[256];
+  private char[] unseqToSeq = new char[256];
+
+  private char[] selector = new char[MAX_SELECTORS];
+  private char[] selectorMtf = new char[MAX_SELECTORS];
+
+  private int[] tt;
+  private char[] ll8;
+
+  /*
+      freq table collected to save a pass over the data
+      during decompression.
+   */
+  private int[] unzftab = new int[256];
+
+  private int[][] limit = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[][] base = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[] minLens = new int[N_GROUPS];
+
+  private FSDataInputStream innerBsStream;
+  long readLimit = Long.MAX_VALUE;
+  public long getReadLimit() {
+    return readLimit;
+  }
+  public void setReadLimit(long readLimit) {
+    this.readLimit = readLimit;
+  }
+  long readCount;
+  public long getReadCount() {
+    return readCount;
+  }
+
+  private boolean streamEnd = false;
+
+  private int currentChar = -1;
+
+  private static final int START_BLOCK_STATE = 1;
+  private static final int RAND_PART_A_STATE = 2;
+  private static final int RAND_PART_B_STATE = 3;
+  private static final int RAND_PART_C_STATE = 4;
+  private static final int NO_RAND_PART_A_STATE = 5;
+  private static final int NO_RAND_PART_B_STATE = 6;
+  private static final int NO_RAND_PART_C_STATE = 7;
+
+  private int currentState = START_BLOCK_STATE;
+
+  private int storedBlockCRC, storedCombinedCRC;
+  private int computedBlockCRC, computedCombinedCRC;
+  private boolean checkComputedCombinedCRC = true;
+
+  int i2, count, chPrev, ch2;
+  int i, tPos;
+  int rNToGo = 0;
+  int rTPos  = 0;
+  int j2;
+  char z;
+
+  // see comment in getPos()
+  private long retPos = -1;
+  // the position offset which corresponds to the end of the InputSplit that
+  // will be processed by this instance 
+  private long endOffsetOfSplit;
+
+  private boolean signalToStopReading;
+
+  public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
+      throws IOException {
+    endOffsetOfSplit = end;
+    // initialize retPos to the beginning of the current InputSplit
+    // see comments in getPos() to understand how this is used.
+    retPos = zStream.getPos();
+    ll8 = null;
+    tt = null;
+    checkComputedCombinedCRC = blockSize == -1;
+    bsSetStream(zStream);
+    initialize(blockSize);
+    initBlock(blockSize != -1);
+    setupBlock();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (streamEnd) {
+      return -1;
+    } else {
+
+      // if we just started reading a bzip block which starts at a position
+      // >= end of current split, then we should set up retpos such that
+      // after a record is read, future getPos() calls will get a value
+      // > end of current split - this way we will read only one record out
+      // of this bzip block - the rest of the records from this bzip block
+      // should be read by the next map task while processing the next split
+      if(signalToStopReading) {
+        retPos = endOffsetOfSplit + 1;
+      }
+
+      int retChar = currentChar;
+      switch(currentState) {
+      case START_BLOCK_STATE:
+        break;
+      case RAND_PART_A_STATE:
+        break;
+      case RAND_PART_B_STATE:
+        setupRandPartB();
+        break;
+      case RAND_PART_C_STATE:
+        setupRandPartC();
+        break;
+      case NO_RAND_PART_A_STATE:
+        break;
+      case NO_RAND_PART_B_STATE:
+        setupNoRandPartB();
+        break;
+      case NO_RAND_PART_C_STATE:
+        setupNoRandPartC();
+        break;
+      default:
+        break;
+      }
+      return retChar;
+    }
+  }
+
+  /**
+   * getPos is used by the caller to know when the processing of the current 
+   * {@link InputSplit} is complete. In this method, as we read each bzip
+   * block, we keep returning the beginning of the {@link InputSplit} as the
+   * return value until we hit a block  which starts at a position >= end of
+   * current split. At that point we should set up retpos such that after a 
+   * record is read, future getPos() calls will get a value > end of current 
+   * split - this way we will read only one record out of that bzip block - 
+   * the rest of the records from that bzip block should be read by the next 
+   * map task while processing the next split
+   * @return
+   * @throws IOException
+   */
+  public long getPos() throws IOException{
+    return retPos;
+  }
+
+  private void initialize(int blockSize) throws IOException {
+    if (blockSize == -1) {
+      char magic1, magic2;
+      char magic3, magic4;
+      magic1 = bsGetUChar();
+      magic2 = bsGetUChar();
+      magic3 = bsGetUChar();
+      magic4 = bsGetUChar();
+      if (magic1 != 'B' || magic2 != 'Z' || 
+          magic3 != 'h' || magic4 < '1' || magic4 > '9') {
+        bsFinishedWithStream();
+        streamEnd = true;
+        return;
+      }
+      blockSize = magic4 - '0';
+    }
+
+    setDecompressStructureSizes(blockSize);
+    computedCombinedCRC = 0;
+  }
+
+  private final static long mask = 0xffffffffffffL;
+  private final static long eob = 0x314159265359L & mask;
+  private final static long eos = 0x177245385090L & mask;
+
+  private void initBlock(boolean searchForMagic) throws IOException {
+    if (readCount >= readLimit) {
+      bsFinishedWithStream();
+      streamEnd = true;
+      return;
+    }
+
+    // position before beginning of bzip block header        
+    long pos = innerBsStream.getPos();
+    if (!searchForMagic) {
+      char magic1, magic2, magic3, magic4;
+      char magic5, magic6;
+      magic1 = bsGetUChar();
+      magic2 = bsGetUChar();
+      magic3 = bsGetUChar();
+      magic4 = bsGetUChar();
+      magic5 = bsGetUChar();
+      magic6 = bsGetUChar();
+      if (magic1 == 0x17 && magic2 == 0x72 && magic3 == 0x45
+          && magic4 == 0x38 && magic5 == 0x50 && magic6 == 0x90) {
+        complete();
+        return;
+      }
+
+      if (magic1 != 0x31 || magic2 != 0x41 || magic3 != 0x59
+          || magic4 != 0x26 || magic5 != 0x53 || magic6 != 0x59) {
+        badBlockHeader();
+        streamEnd = true;
+        return;
+      }
+    } else {
+      long magic = 0;
+      for(int i = 0; i < 6; i++) {
+        magic <<= 8;
+        magic |= bsGetUChar();
+      }
+      while(magic != eos && magic != eob) {
+        magic <<= 1;
+        magic &= mask;
+        magic |= bsR(1);
+        // if we just found the block header, the beginning of the bzip 
+        // header would be 6 bytes before the current stream position
+        // when we eventually break from this while(), if it is because
+        // we found a block header then pos will have the correct start
+        // of header position
+        pos = innerBsStream.getPos() - 6;
+      }
+      if (magic == eos) {
+        complete();
+        return;
+      }
+
+    }
+    // if the previous block finished a few bits into the previous byte,
+    // then we will first be reading the remaining bits from the previous
+    // byte - so logically pos needs to be one behind
+    if(bsLive > 0)  {
+      pos--;
+    }
+
+    if(pos >= endOffsetOfSplit) {
+      // we have reached a block which begins exactly at the next InputSplit
+      // or >1 byte into the next InputSplit - lets record this fact
+      signalToStopReading = true;
+    }
+    storedBlockCRC = bsGetInt32();
+
+    if (bsR(1) == 1) {
+      blockRandomised = true;
+    } else {
+      blockRandomised = false;
+    }
+
+    //        currBlockNo++;
+    getAndMoveToFrontDecode();
+
+    mCrc.initialiseCRC();
+    currentState = START_BLOCK_STATE;
+  }
+
+  private void endBlock() throws IOException {
+    computedBlockCRC = mCrc.getFinalCRC();
+    /* A bad CRC is considered a fatal error. */
+    if (storedBlockCRC != computedBlockCRC) {
+      crcError();
+    }
+
+    computedCombinedCRC = (computedCombinedCRC << 1)
+        | (computedCombinedCRC >>> 31);
+    computedCombinedCRC ^= computedBlockCRC;
+  }
+
+  private void complete() throws IOException {
+    storedCombinedCRC = bsGetInt32();
+    if (checkComputedCombinedCRC && 
+        storedCombinedCRC != computedCombinedCRC) {
+      crcError();
+    }
+    if (innerBsStream.getPos() < endOffsetOfSplit) {
+      throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
+          + "Loading of concatenated bz2 files is not supported");
+    }
+    bsFinishedWithStream();
+    streamEnd = true;
+  }
+
+  private static void blockOverrun() throws IOException {
+    cadvise("block overrun");
+  }
+
+  private static void badBlockHeader() throws IOException {
+    cadvise("bad block header");
+  }
+
+  private static void crcError() throws IOException {
+    cadvise("CRC error");
+  }
+
+  private void bsFinishedWithStream() {
+    if (this.innerBsStream != null) {
+      if (this.innerBsStream != System.in) {
+        this.innerBsStream = null;
+      }
+    }
+  }
+
+  private void bsSetStream(FSDataInputStream f) {
+    innerBsStream = f;
+    bsLive = 0;
+    bsBuff = 0;
+  }
+
+  final private int readBs() throws IOException {
+    readCount++;
+    return innerBsStream.read();
+  }
+  private int bsR(int n) throws IOException {
+    int v;
+    while (bsLive < n) {
+      int zzi;
+      zzi = readBs();
+      if (zzi == -1) {
+        compressedStreamEOF();
+      }
+      bsBuff = (bsBuff << 8) | (zzi & 0xff);
+      bsLive += 8;
+    }
+
+    v = (bsBuff >> (bsLive - n)) & ((1 << n) - 1);
+    bsLive -= n;
+    return v;
+  }
+
+
+  private char bsGetUChar() throws IOException {
+    return (char) bsR(8);
+  }
+
+  private int bsGetint() throws IOException {
+    int u = 0;
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    return u;
+  }
+
+  private int bsGetIntVS(int numBits) throws IOException {
+    return bsR(numBits);
+  }
+
+  private int bsGetInt32() throws IOException {
+    return bsGetint();
+  }
+
+  private void hbCreateDecodeTables(int[] limit, int[] base,
+      int[] perm, char[] length,
+      int minLen, int maxLen, int alphaSize) {
+    int pp, i, j, vec;
+
+    pp = 0;
+    for (i = minLen; i <= maxLen; i++) {
+      for (j = 0; j < alphaSize; j++) {
+        if (length[j] == i) {
+          perm[pp] = j;
+          pp++;
+        }
+      }
+    }
+
+    for (i = 0; i < MAX_CODE_LEN; i++) {
+      base[i] = 0;
+    }
+    for (i = 0; i < alphaSize; i++) {
+      base[length[i] + 1]++;
+    }
+
+    for (i = 1; i < MAX_CODE_LEN; i++) {
+      base[i] += base[i - 1];
+    }
+
+    for (i = 0; i < MAX_CODE_LEN; i++) {
+      limit[i] = 0;
+    }
+    vec = 0;
+
+    for (i = minLen; i <= maxLen; i++) {
+      vec += (base[i + 1] - base[i]);
+      limit[i] = vec - 1;
+      vec <<= 1;
+    }
+    for (i = minLen + 1; i <= maxLen; i++) {
+      base[i] = ((limit[i - 1] + 1) << 1) - base[i];
+    }
+  }
+
+  private void recvDecodingTables() throws IOException {
+    char len[][] = new char[N_GROUPS][MAX_ALPHA_SIZE];
+    int i, j, t, nGroups, nSelectors, alphaSize;
+    int minLen, maxLen;
+    boolean[] inUse16 = new boolean[16];
+
+    /* Receive the mapping table */
+    for (i = 0; i < 16; i++) {
+      if (bsR(1) == 1) {
+        inUse16[i] = true;
+      } else {
+        inUse16[i] = false;
+      }
+    }
+
+    for (i = 0; i < 256; i++) {
+      inUse[i] = false;
+    }
+
+    for (i = 0; i < 16; i++) {
+      if (inUse16[i]) {
+        for (j = 0; j < 16; j++) {
+          if (bsR(1) == 1) {
+            inUse[i * 16 + j] = true;
+          }
+        }
+      }
+    }
+
+    makeMaps();
+    alphaSize = nInUse + 2;
+
+    /* Now the selectors */
+    nGroups = bsR(3);
+    nSelectors = bsR(15);
+    for (i = 0; i < nSelectors; i++) {
+      j = 0;
+      while (bsR(1) == 1) {
+        j++;
+      }
+      selectorMtf[i] = (char) j;
+    }
+
+    /* Undo the MTF values for the selectors. */
+    {
+      char[] pos = new char[N_GROUPS];
+      char tmp, v;
+      for (v = 0; v < nGroups; v++) {
+        pos[v] = v;
+      }
+
+      for (i = 0; i < nSelectors; i++) {
+        v = selectorMtf[i];
+        tmp = pos[v];
+        while (v > 0) {
+          pos[v] = pos[v - 1];
+          v--;
+        }
+        pos[0] = tmp;
+        selector[i] = tmp;
+      }
+    }
+
+    /* Now the coding tables */
+    for (t = 0; t < nGroups; t++) {
+      int curr = bsR(5);
+      for (i = 0; i < alphaSize; i++) {
+        while (bsR(1) == 1) {
+          if (bsR(1) == 0) {
+            curr++;
+          } else {
+            curr--;
+          }
+        }
+        len[t][i] = (char) curr;
+      }
+    }
+
+    /* Create the Huffman decoding tables */
+    for (t = 0; t < nGroups; t++) {
+      minLen = 32;
+      maxLen = 0;
+      for (i = 0; i < alphaSize; i++) {
+        if (len[t][i] > maxLen) {
+          maxLen = len[t][i];
+        }
+        if (len[t][i] < minLen) {
+          minLen = len[t][i];
+        }
+      }
+      hbCreateDecodeTables(limit[t], base[t], perm[t], len[t], minLen,
+          maxLen, alphaSize);
+      minLens[t] = minLen;
+    }
+  }
+
+  private void getAndMoveToFrontDecode() throws IOException {
+    char[] yy = new char[256];
+    int i, j, nextSym, limitLast;
+    int EOB, groupNo, groupPos;
+
+    limitLast = baseBlockSize * blockSize100k;
+    origPtr = bsGetIntVS(24);
+
+    recvDecodingTables();
+    EOB = nInUse + 1;
+    groupNo = -1;
+    groupPos = 0;
+
+    /*
+          Setting up the unzftab entries here is not strictly
+          necessary, but it does save having to do it later
+          in a separate pass, and so saves a block's worth of
+          cache misses.
+     */
+    for (i = 0; i <= 255; i++) {
+      unzftab[i] = 0;
+    }
+
+    for (i = 0; i <= 255; i++) {
+      yy[i] = (char) i;
+    }
+
+    last = -1;
+
+    {
+      int zt, zn, zvec, zj;
+      if (groupPos == 0) {
+        groupNo++;
+        groupPos = G_SIZE;
+      }
+      groupPos--;
+      zt = selector[groupNo];
+      zn = minLens[zt];
+      zvec = bsR(zn);
+      while (zvec > limit[zt][zn]) {
+        zn++;
+        {
+          {
+            while (bsLive < 1) {
+              int zzi = 0;
+              try {
+                zzi = readBs();
+              } catch (IOException e) {
+                compressedStreamEOF();
+              }
+              if (zzi == -1) {
+                compressedStreamEOF();
+              }
+              bsBuff = (bsBuff << 8) | (zzi & 0xff);
+              bsLive += 8;
+            }
+          }
+          zj = (bsBuff >> (bsLive - 1)) & 1;
+          bsLive--;
+        }
+        zvec = (zvec << 1) | zj;
+      }
+      nextSym = perm[zt][zvec - base[zt][zn]];
+    }
+
+    while (true) {
+
+      if (nextSym == EOB) {
+        break;
+      }
+
+      if (nextSym == RUNA || nextSym == RUNB) {
+        char ch;
+        int s = -1;
+        int N = 1;
+        do {
+          if (nextSym == RUNA) {
+            s = s + (0 + 1) * N;
+          } else if (nextSym == RUNB) {
+            s = s + (1 + 1) * N;
+          }
+          N = N * 2;
+          {
+            int zt, zn, zvec, zj;
+            if (groupPos == 0) {
+              groupNo++;
+              groupPos = G_SIZE;
+            }
+            groupPos--;
+            zt = selector[groupNo];
+            zn = minLens[zt];
+            zvec = bsR(zn);
+            while (zvec > limit[zt][zn]) {
+              zn++;
+              {
+                {
+                  while (bsLive < 1) {
+                    int zzi = 0;
+                    try {
+                      zzi = readBs();
+                    } catch (IOException e) {
+                      compressedStreamEOF();
+                    }
+                    if (zzi == -1) {
+                      compressedStreamEOF();
+                    }
+                    bsBuff = (bsBuff << 8) | (zzi & 0xff);
+                    bsLive += 8;
+                  }
+                }
+                zj = (bsBuff >> (bsLive - 1)) & 1;
+                bsLive--;
+              }
+              zvec = (zvec << 1) | zj;
+            }
+            nextSym = perm[zt][zvec - base[zt][zn]];
+          }
+        } while (nextSym == RUNA || nextSym == RUNB);
+
+        s++;
+        ch = seqToUnseq[yy[0]];
+        unzftab[ch] += s;
+
+        while (s > 0) {
+          last++;
+          ll8[last] = ch;
+          s--;
+        }
+
+        if (last >= limitLast) {
+          blockOverrun();
+        }
+        continue;
+      } else {
+        char tmp;
+        last++;
+        if (last >= limitLast) {
+          blockOverrun();
+        }
+
+        tmp = yy[nextSym - 1];
+        unzftab[seqToUnseq[tmp]]++;
+        ll8[last] = seqToUnseq[tmp];
+
+        /*
+                  This loop is hammered during decompression,
+                  hence the unrolling.
+
+                  for (j = nextSym-1; j > 0; j--) yy[j] = yy[j-1];
+         */
+
+         j = nextSym - 1;
+         for (; j > 3; j -= 4) {
+           yy[j]     = yy[j - 1];
+           yy[j - 1] = yy[j - 2];
+           yy[j - 2] = yy[j - 3];
+           yy[j - 3] = yy[j - 4];
+         }
+         for (; j > 0; j--) {
+           yy[j] = yy[j - 1];
+         }
+
+         yy[0] = tmp;
+         {
+           int zt, zn, zvec, zj;
+           if (groupPos == 0) {
+             groupNo++;
+             groupPos = G_SIZE;
+           }
+           groupPos--;
+           zt = selector[groupNo];
+           zn = minLens[zt];
+           zvec = bsR(zn);
+           while (zvec > limit[zt][zn]) {
+             zn++;
+             {
+               {
+                 while (bsLive < 1) {
+                   int zzi;
+                   char thech = 0;
+                   try {
+                     thech = (char) readBs();
+                   } catch (IOException e) {
+                     compressedStreamEOF();
+                   }
+                   zzi = thech;
+                   bsBuff = (bsBuff << 8) | (zzi & 0xff);
+                   bsLive += 8;
+                 }
+               }
+               zj = (bsBuff >> (bsLive - 1)) & 1;
+               bsLive--;
+             }
+             zvec = (zvec << 1) | zj;
+           }
+           nextSym = perm[zt][zvec - base[zt][zn]];
+         }
+         continue;
+      }
+    }
+  }
+
+  private void setupBlock() throws IOException {
+    int[] cftab = new int[257];
+    char ch;
+
+    cftab[0] = 0;
+    for (i = 1; i <= 256; i++) {
+      cftab[i] = unzftab[i - 1];
+    }
+    for (i = 1; i <= 256; i++) {
+      cftab[i] += cftab[i - 1];
+    }
+
+    for (i = 0; i <= last; i++) {
+      ch = ll8[i];
+      tt[cftab[ch]] = i;
+      cftab[ch]++;
+    }
+    cftab = null;
+
+    tPos = tt[origPtr];
+
+    count = 0;
+    i2 = 0;
+    ch2 = 256;   /* not a char and not EOF */
+
+    if (blockRandomised) {
+      rNToGo = 0;
+      rTPos = 0;
+      setupRandPartA();
+    } else {
+      setupNoRandPartA();
+    }
+  }
+
+  private void setupRandPartA() throws IOException {
+    if (i2 <= last) {
+      chPrev = ch2;
+      ch2 = ll8[tPos];
+      tPos = tt[tPos];
+      if (rNToGo == 0) {
+        rNToGo = rNums[rTPos];
+        rTPos++;
+        if (rTPos == 512) {
+          rTPos = 0;
+        }
+      }
+      rNToGo--;
+      ch2 ^= ((rNToGo == 1) ? 1 : 0);
+      i2++;
+
+      currentChar = ch2;
+      currentState = RAND_PART_B_STATE;
+      mCrc.updateCRC(ch2);
+    } else {
+      endBlock();
+      initBlock(false);
+      setupBlock();
+    }
+  }
+
+  private void setupNoRandPartA() throws IOException {
+    if (i2 <= last) {
+      chPrev = ch2;
+      ch2 = ll8[tPos];
+      tPos = tt[tPos];
+      i2++;
+
+      currentChar = ch2;
+      currentState = NO_RAND_PART_B_STATE;
+      mCrc.updateCRC(ch2);
+    } else {
+      endBlock();
+      initBlock(false);
+      setupBlock();
+    }
+  }
+
+  private void setupRandPartB() throws IOException {
+    if (ch2 != chPrev) {
+      currentState = RAND_PART_A_STATE;
+      count = 1;
+      setupRandPartA();
+    } else {
+      count++;
+      if (count >= 4) {
+        z = ll8[tPos];
+        tPos = tt[tPos];
+        if (rNToGo == 0) {
+          rNToGo = rNums[rTPos];
+          rTPos++;
+          if (rTPos == 512) {
+            rTPos = 0;
+          }
+        }
+        rNToGo--;
+        z ^= ((rNToGo == 1) ? 1 : 0);
+        j2 = 0;
+        currentState = RAND_PART_C_STATE;
+        setupRandPartC();
+      } else {
+        currentState = RAND_PART_A_STATE;
+        setupRandPartA();
+      }
+    }
+  }
+
+  private void setupRandPartC() throws IOException {
+    if (j2 < (int) z) {
+      currentChar = ch2;
+      mCrc.updateCRC(ch2);
+      j2++;
+    } else {
+      currentState = RAND_PART_A_STATE;
+      i2++;
+      count = 0;
+      setupRandPartA();
+    }
+  }
+
+  private void setupNoRandPartB() throws IOException {
+    if (ch2 != chPrev) {
+      currentState = NO_RAND_PART_A_STATE;
+      count = 1;
+      setupNoRandPartA();
+    } else {
+      count++;
+      if (count >= 4) {
+        z = ll8[tPos];
+        tPos = tt[tPos];
+        currentState = NO_RAND_PART_C_STATE;
+        j2 = 0;
+        setupNoRandPartC();
+      } else {
+        currentState = NO_RAND_PART_A_STATE;
+        setupNoRandPartA();
+      }
+    }
+  }
+
+  private void setupNoRandPartC() throws IOException {
+    if (j2 < (int) z) {
+      currentChar = ch2;
+      mCrc.updateCRC(ch2);
+      j2++;
+    } else {
+      currentState = NO_RAND_PART_A_STATE;
+      i2++;
+      count = 0;
+      setupNoRandPartA();
+    }
+  }
+
+  private void setDecompressStructureSizes(int newSize100k) {
+    if (!(0 <= newSize100k && newSize100k <= 9 && 0 <= blockSize100k
+        && blockSize100k <= 9)) {
+      // throw new IOException("Invalid block size");
+    }
+
+    blockSize100k = newSize100k;
+
+    if (newSize100k == 0) {
+      return;
+    }
+
+    int n = baseBlockSize * newSize100k;
+    ll8 = new char[n];
+    tt = new int[n];
+  }
+
+  private static class CRC {
+    public static int crc32Table[] = {
+      0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9,
+      0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005,
+      0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
+      0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd,
+      0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9,
+      0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
+      0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011,
+      0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd,
+      0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
+      0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5,
+      0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81,
+      0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
+      0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49,
+      0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95,
+      0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
+      0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d,
+      0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae,
+      0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
+      0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16,
+      0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca,
+      0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
+      0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02,
+      0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066,
+      0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
+      0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e,
+      0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692,
+      0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
+      0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a,
+      0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e,
+      0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
+      0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686,
+      0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a,
+      0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
+      0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb,
+      0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f,
+      0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
+      0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47,
+      0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b,
+      0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
+      0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623,
+      0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7,
+      0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
+      0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f,
+      0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3,
+      0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
+      0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b,
+      0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f,
+      0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
+      0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640,
+      0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c,
+      0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
+      0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24,
+      0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30,
+      0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
+      0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088,
+      0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654,
+      0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
+      0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c,
+      0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18,
+      0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
+      0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0,
+      0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c,
+      0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
+      0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
+    };
+
+    public CRC() {
+      initialiseCRC();
+    }
+
+    void initialiseCRC() {
+      globalCrc = 0xffffffff;
+    }
+
+    int getFinalCRC() {
+      return ~globalCrc;
+    }
+
+    void updateCRC(int inCh) {
+      int temp = (globalCrc >> 24) ^ inCh;
+      if (temp < 0) {
+        temp = 256 + temp;
+      }
+      globalCrc = (globalCrc << 8) ^ CRC.crc32Table[temp];
+    }
+
+    int globalCrc;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
new file mode 100644
index 0000000..9d2d409
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.CompositeMapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private static final Log LOG = LogFactory.getLog(TextFileReaderFactory.class);
+  
+  private final PType<T> ptype;
+  private final Configuration conf;
+  
+  public TextFileReaderFactory(PType<T> ptype, Configuration conf) {
+    this.ptype = ptype;
+    this.conf = conf;
+  }
+  
+  @Override
+  public Iterator<T> read(FileSystem fs, Path path) {
+    MapFn mapFn = null;
+    if (String.class.equals(ptype.getTypeClass())) {
+      mapFn = IdentityFn.getInstance();
+    } else {
+      // Check for a composite MapFn for the PType.
+      // Note that this won't work for Avro-- need to solve that.
+      MapFn input = ptype.getInputMapFn();
+      if (input instanceof CompositeMapFn) {
+        mapFn = ((CompositeMapFn) input).getSecond();
+      }
+    }
+    mapFn.setConfigurationForTest(conf);
+    mapFn.initialize();
+    
+	FSDataInputStream is = null;
+	try {
+	  is = fs.open(path);
+	} catch (IOException e) {
+	  LOG.info("Could not read path: " + path, e);
+	  return Iterators.emptyIterator();
+	}
+	
+	final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+	final MapFn<String, T> iterMapFn = mapFn;
+	return new UnmodifiableIterator<T>() {
+	  private String nextLine;
+	  @Override
+	  public boolean hasNext() {
+		try {
+		  return (nextLine = reader.readLine()) != null;
+		} catch (IOException e) {
+		  LOG.info("Exception reading text file stream", e);
+		  return false;
+		}
+	  }
+
+	  @Override
+	  public T next() {
+		return iterMapFn.map(nextLine);
+	  }
+	};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/TextFileSource.java b/src/main/java/org/apache/crunch/io/text/TextFileSource.java
new file mode 100644
index 0000000..f8a7b3a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/TextFileSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.AvroUtf8InputFormat;
+
+public class TextFileSource<T> extends FileSourceImpl<T> implements
+	ReadableSource<T> {
+
+  private static boolean isBZip2(Path path) {
+	String strPath = path.toString();
+	return strPath.endsWith(".bz") || strPath.endsWith(".bz2");
+  }
+  
+  private static <S> Class<? extends FileInputFormat<?,?>> getInputFormat(Path path, PType<S> ptype) {
+	if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
+	  return AvroUtf8InputFormat.class;
+	} else if (isBZip2(path)){
+      return BZip2TextInputFormat.class;
+	} else {
+  	  return TextInputFormat.class;
+	}
+  }
+  
+  public TextFileSource(Path path, PType<T> ptype) {
+	super(path, ptype, getInputFormat(path, ptype));
+  }
+  
+  @Override
+  public long getSize(Configuration conf) {
+	long sz = super.getSize(conf);
+	if (isBZip2(path)) {
+	  sz *= 10; // Arbitrary compression factor
+	}
+	return sz;
+  }
+  
+  @Override
+  public String toString() {
+    return "Text(" + path + ")";
+  }
+  
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return CompositePathIterable.create(FileSystem.get(path.toUri(), conf), path,
+        new TextFileReaderFactory<T>(ptype, conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java b/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
new file mode 100644
index 0000000..5163c6a
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/TextFileSourceTarget.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.PType;
+
+public class TextFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public TextFileSourceTarget(String path, PType<T> ptype) {
+    this(new Path(path), ptype);
+  }
+  
+  public TextFileSourceTarget(Path path, PType<T> ptype) {
+    super(new TextFileSource<T>(path, ptype), new TextFileTarget(path));
+  }
+  
+  @Override
+  public String toString() {
+	return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
new file mode 100644
index 0000000..ac3a52b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/io/text/TextFileTarget.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+public class TextFileTarget extends FileTargetImpl {
+  public TextFileTarget(String path) {
+    this(new Path(path));
+  }
+  
+  public TextFileTarget(Path path) {
+    super(path, TextOutputFormat.class);
+  }
+  
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public String toString() {
+    return "Text(" + path + ")";
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof PTableType) {
+      return null;
+    }
+    return new TextFileSourceTarget<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Aggregate.java b/src/main/java/org/apache/crunch/lib/Aggregate.java
new file mode 100644
index 0000000..dcc655b
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing various types of aggregations over {@link PCollection}
+ * instances.
+ *
+ */
+public class Aggregate {
+
+  /**
+   * Returns a {@code PTable} that contains the unique elements of this
+   * collection mapped to a count of their occurrences.
+   */
+  public static <S> PTable<S, Long> count(PCollection<S> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() {      
+      public Pair<S, Long> map(S input) {
+        return Pair.of(input, 1L);
+      }
+    }, tf.tableOf(collect.getPType(), tf.longs()))
+    .groupByKey()
+    .combineValues(CombineFn.<S> SUM_LONGS());
+  }
+  
+  public static class PairValueComparator<K, V> implements Comparator<Pair<K, V>> {
+    private final boolean ascending;
+    
+    public PairValueComparator(boolean ascending) {
+      this.ascending = ascending;
+    }
+    
+    @Override
+    public int compare(Pair<K, V> left, Pair<K, V> right) {
+      int cmp = ((Comparable<V>) left.second()).compareTo(right.second());
+      return ascending ? cmp : -cmp;
+    }
+  }
+  
+  public static class TopKFn<K, V> extends DoFn<Pair<K, V>, Pair<Integer, Pair<K, V>>> {
+    
+    private final int limit;
+    private final boolean maximize;
+    private transient PriorityQueue<Pair<K, V>> values;
+    
+    public TopKFn(int limit, boolean ascending) {
+      this.limit = limit;
+      this.maximize = ascending;
+    }
+    
+    public void initialize() {
+      this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize));
+    }
+    
+    public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      values.add(input);
+      if (values.size() > limit) {
+        values.poll();
+      }
+    }
+    
+    public void cleanup(Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      for (Pair<K, V> p : values) {
+        emitter.emit(Pair.of(0, p));
+      }
+    }
+  }
+  
+  public static class TopKCombineFn<K, V> extends CombineFn<Integer, Pair<K, V>> {
+    
+    private final int limit;
+    private final boolean maximize;
+    
+    public TopKCombineFn(int limit, boolean maximize) {
+      this.limit = limit;
+      this.maximize = maximize;
+    }
+    
+    @Override
+    public void process(Pair<Integer, Iterable<Pair<K, V>>> input,
+        Emitter<Pair<Integer, Pair<K, V>>> emitter) {
+      Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
+      PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
+      for (Pair<K, V> pair : input.second()) {
+        queue.add(pair);
+        if (queue.size() > limit) {
+          queue.poll();
+        }
+      }
+      
+      List<Pair<K, V>> values = Lists.newArrayList(queue);
+      Collections.sort(values, cmp);
+      for (int i = values.size() - 1; i >= 0; i--) {
+        emitter.emit(Pair.of(0, values.get(i)));
+      }
+    }
+  }
+  
+  public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
+    PTypeFamily ptf = ptable.getTypeFamily();
+    PTableType<K, V> base = ptable.getPTableType();
+    PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
+    PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
+        .groupByKey(1)
+        .combineValues(new TopKCombineFn<K, V>(limit, maximize))
+        .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
+          public void process(Pair<Integer, Pair<K, V>> input,
+              Emitter<Pair<K, V>> emitter) {
+            emitter.emit(input.second()); 
+          }
+        }, base);
+  }
+  
+  /**
+   * Returns the largest numerical element from the input collection.
+   */
+  public static <S> PCollection<S> max(PCollection<S> collect) {
+	Class<S> clazz = collect.getPType().getTypeClass();
+	if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+	  throw new IllegalArgumentException(
+	      "Can only get max for Comparable elements, not for: " + collect.getPType().getTypeClass());
+	}
+    PTypeFamily tf = collect.getTypeFamily();
+    return PTables.values(
+        collect.parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S max = null;
+          
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (max == null || ((Comparable<S>) max).compareTo(input) < 0) {
+              max = input;
+            }
+          }
+          
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (max != null) {
+              emitter.emit(Pair.of(true, max));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType()))
+        .groupByKey(1).combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input,
+              Emitter<Pair<Boolean, S>> emitter) {
+            S max = null;
+            for (S v : input.second()) {
+              if (max == null || ((Comparable<S>) max).compareTo(v) < 0) {
+                max = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), max));
+          } }));
+  }
+  
+  /**
+   * Returns the smallest numerical element from the input collection.
+   */
+  public static <S> PCollection<S> min(PCollection<S> collect) {
+    Class<S> clazz = collect.getPType().getTypeClass();
+    if (!clazz.isPrimitive() && !Comparable.class.isAssignableFrom(clazz)) {
+      throw new IllegalArgumentException(
+          "Can only get min for Comparable elements, not for: " + collect.getPType().getTypeClass());
+    }
+    PTypeFamily tf = collect.getTypeFamily();
+    return PTables.values(
+        collect.parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
+          private transient S min = null;
+
+          public void process(S input, Emitter<Pair<Boolean, S>> emitter) {
+            if (min == null || ((Comparable<S>) min).compareTo(input) > 0) {
+              min = input;
+            }
+          }
+
+          public void cleanup(Emitter<Pair<Boolean, S>> emitter) {
+            if (min != null) {
+              emitter.emit(Pair.of(false, min));
+            }
+          }
+        }, tf.tableOf(tf.booleans(), collect.getPType()))
+        .groupByKey().combineValues(new CombineFn<Boolean, S>() {
+          public void process(Pair<Boolean, Iterable<S>> input,
+              Emitter<Pair<Boolean, S>> emitter) {
+            S min = null;
+            for (S v : input.second()) {
+              if (min == null || ((Comparable<S>) min).compareTo(v) > 0) {
+                min = v;
+              }
+            }
+            emitter.emit(Pair.of(input.first(), min));
+          } }));
+  }
+  
+  public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
+    PTypeFamily tf = collect.getTypeFamily();
+    final PType<V> valueType = collect.getValueType();
+    return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+          public Collection<V> map(Iterable<V> values) {
+            List<V> collected = Lists.newArrayList();
+            for (V value : values) {
+              collected.add(valueType.getDetachedValue(value));
+            }
+            return collected;
+      }
+    }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType())));  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Cartesian.java b/src/main/java/org/apache/crunch/lib/Cartesian.java
new file mode 100644
index 0000000..e6ce7fc
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for Cartesian products of two {@code PTable} or {@code PCollection} instances.
+ */
+@SuppressWarnings("serial")
+public class Cartesian {
+
+  /**
+   * Helper for building the artificial cross keys. This technique was taken from Pig's CROSS.
+   */
+  private static class GFCross<V> extends DoFn<V, Pair<Pair<Integer, Integer>, V>>{
+
+    private final int constantField;
+    private final int parallelism;
+    private final Random r;
+
+    public GFCross(int constantField, int parallelism) {
+      this.constantField = constantField;
+      this.parallelism = parallelism;
+      this.r = new Random();
+    }
+
+    public void process(V input, Emitter<Pair<Pair<Integer, Integer>, V>> emitter) {
+      int c = r.nextInt(parallelism);
+      if (constantField == 0) {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(c, i), input));
+        }
+      } else {
+        for (int i = 0; i < parallelism; i++) {
+          emitter.emit(Pair.of(Pair.of(i, c), input));
+        }
+      }
+    }
+  }
+
+  static final int DEFAULT_PARALLELISM = 6;
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PTable to perform a cross join on.
+   * @param right A PTable to perform a cross join on.
+   * @param <K1> Type of left PTable's keys.
+   * @param <K2> Type of right PTable's keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(
+      PTable<K1, U> left,
+      PTable<K2, V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PTable}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PTable to perform a cross join on.
+   * @param right A PTable to perform a cross join on.
+   * @param parallelism The square root of the number of reducers to use.  Increasing parallelism also increases copied data.
+   * @param <K1> Type of left PTable's keys.
+   * @param <K2> Type of right PTable's keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result as tuples of ((K1,K2), (U,V)).
+   */
+  public static <K1, K2, U, V> PTable<Pair<K1, K2>, Pair<U, V>> cross(
+      PTable<K1, U> left,
+      PTable<K2, V> right,
+      int parallelism) {
+
+    /* The strategy here is to simply emulate the following PigLatin:
+     *   A  = foreach table1 generate flatten(GFCross(0, 2)), flatten(*); 
+     *   B  = foreach table2 generate flatten(GFCross(1, 2)), flatten(*); 
+     *   C = cogroup A by ($0, $1), B by ($0, $1);
+     *   result = foreach C generate flatten(A), flatten(B);
+     */
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTable<Pair<Integer, Integer>, Pair<K1,U>> leftCross =
+        left.parallelDo(
+            new GFCross<Pair<K1,U>>(0, parallelism), 
+            ltf.tableOf(
+                ltf.pairs(ltf.ints(), ltf.ints()), 
+                ltf.pairs(left.getKeyType(), left.getValueType())));
+    PTable<Pair<Integer, Integer>, Pair<K2,V>> rightCross =
+        right.parallelDo(
+            new GFCross<Pair<K2,V>>(1, parallelism), 
+            rtf.tableOf(
+                rtf.pairs(rtf.ints(), rtf.ints()), 
+                rtf.pairs(right.getKeyType(), right.getValueType())));
+
+    PTable<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> cg =
+        leftCross.cogroup(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(
+        new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
+          @Override
+          public void process(
+              Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> input,
+              Emitter<Pair<Pair<K1, K2>, Pair<U, V>>> emitter) {
+            for (Pair<K1, U> l: input.second().first()) {
+              for (Pair<K2, V> r: input.second().second()) {
+                emitter.emit(Pair.of(Pair.of(l.first(), r.first()), Pair.of(l.second(), r.second())));
+              }
+            }
+          }
+        },
+        ctf.tableOf(
+            ctf.pairs(left.getKeyType(), right.getKeyType()), 
+            ctf.pairs(left.getValueType(), right.getValueType()))
+        );
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PCollection to perform a cross join on.
+   * @param right A PCollection to perform a cross join on.
+   * @param <U> Type of the first {@link PCollection}'s values
+   * @param <V> Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(
+      PCollection<U> left,
+      PCollection<V> right) {
+    return cross(left, right, DEFAULT_PARALLELISM);
+  }
+
+  /**
+   * Performs a full cross join on the specified {@link PCollection}s (using the same strategy as Pig's CROSS operator).
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Cross_join">Cross Join</a>
+   * @param left A PCollection to perform a cross join on.
+   * @param right A PCollection to perform a cross join on.
+   * @param <U> Type of the first {@link PCollection}'s values
+   * @param <V> Type of the second {@link PCollection}'s values
+   * @return The joined result as tuples of (U,V).
+   */
+  public static <U, V> PCollection<Pair<U, V>> cross(
+      PCollection<U> left,
+      PCollection<V> right,
+      int parallelism) {
+
+    PTypeFamily ltf = left.getTypeFamily();
+    PTypeFamily rtf = right.getTypeFamily();
+
+    PTableType<Pair<Integer, Integer>, U> ptt = ltf.tableOf(
+        ltf.pairs(ltf.ints(), ltf.ints()), 
+        left.getPType());
+
+    if (ptt == null)
+      throw new Error();
+
+    PTable<Pair<Integer, Integer>, U> leftCross =
+        left.parallelDo(
+            new GFCross<U>(0, parallelism), 
+            ltf.tableOf(
+                ltf.pairs(ltf.ints(), ltf.ints()), 
+                left.getPType()));
+    PTable<Pair<Integer, Integer>, V> rightCross =
+        right.parallelDo(
+            new GFCross<V>(1, parallelism), 
+            rtf.tableOf(
+                rtf.pairs(rtf.ints(), rtf.ints()), 
+                right.getPType()));
+
+    PTable<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> cg =
+        leftCross.cogroup(rightCross);
+
+    PTypeFamily ctf = cg.getTypeFamily();
+
+    return cg.parallelDo(
+        new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>>, Pair<U,V>>() {
+          @Override
+          public void process(
+              Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> input,
+              Emitter<Pair<U,V>> emitter) {
+            for (U l: input.second().first()) {
+              for (V r: input.second().second()) {
+                emitter.emit(Pair.of(l, r));
+              }
+            }
+          }
+        }, ctf.pairs(left.getPType(), right.getPType()));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Cogroup.java b/src/main/java/org/apache/crunch/lib/Cogroup.java
new file mode 100644
index 0000000..6f759bb
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.MapValuesFn;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import com.google.common.collect.Lists;
+
+public class Cogroup {
+  
+  /**
+   * Co-groups the two {@link PTable} arguments.
+   * 
+   * @return a {@code PTable} representing the co-grouped tables.
+   */
+  public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(
+      PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PType<K> keyType = left.getPTableType().getKeyType();
+    PType<U> leftType = left.getPTableType().getValueType();
+    PType<V> rightType = right.getPTableType().getValueType();
+    PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
+
+    PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1",
+        new CogroupFn1<K, U, V>(), ptf.tableOf(keyType, itype));
+    PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2",
+        new CogroupFn2<K, U, V>(), ptf.tableOf(keyType, itype));
+    
+    PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
+
+    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(
+        ptf.collections(leftType), ptf.collections(rightType));
+    return both.groupByKey().parallelDo("cogroup",
+        new PostGroupFn<K, U, V>(), ptf.tableOf(keyType, otype));
+  }
+
+  private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(V v) {
+      return Pair.of(v, null);
+    }
+  }
+
+  private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+    @Override
+    public Pair<V, U> map(U u) {
+      return Pair.of(null, u);
+    }
+  }
+
+  private static class PostGroupFn<K, V, U> extends
+      DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+    @Override
+    public void process(Pair<K, Iterable<Pair<V, U>>> input,
+        Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
+      Collection<V> cv = Lists.newArrayList();
+      Collection<U> cu = Lists.newArrayList();
+      for (Pair<V, U> pair : input.second()) {
+        if (pair.first() != null) {
+          cv.add(pair.first());
+        } else if (pair.second() != null) {
+          cu.add(pair.second());
+        }
+      }
+      emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Join.java b/src/main/java/org/apache/crunch/lib/Join.java
new file mode 100644
index 0000000..9840c33
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Join.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lib.join.FullOuterJoinFn;
+import org.apache.crunch.lib.join.InnerJoinFn;
+import org.apache.crunch.lib.join.JoinFn;
+import org.apache.crunch.lib.join.JoinUtils;
+import org.apache.crunch.lib.join.LeftOuterJoinFn;
+import org.apache.crunch.lib.join.RightOuterJoinFn;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for joining multiple {@code PTable} instances based on a common lastKey.
+ */
+public class Join {
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner Join</a>
+   * @param left A PTable to perform an inner join on.
+   * @param right A PTable to perform an inner join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right) {
+    return innerJoin(left, right);
+  }
+
+  /**
+   * Performs an inner join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Inner_join">Inner Join</a>
+   * @param left A PTable to perform an inner join on.
+   * @param right A PTable to perform an inner join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> innerJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new InnerJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a left outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Left_outer_join">Left Join</a>
+   * @param left A PTable to perform an left join on. All of this PTable's entries will appear
+   *     in the resulting PTable.
+   * @param right A PTable to perform an left join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> leftJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new LeftOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a right outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Right_outer_join">Right Join</a>
+   * @param left A PTable to perform an right join on.
+   * @param right A PTable to perform an right join on. All of this PTable's entries will appear
+   *     in the resulting PTable.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> rightJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new RightOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  /**
+   * Performs a full outer join on the specified {@link PTable}s.
+   *
+   * @see <a href="http://en.wikipedia.org/wiki/Join_(SQL)#Full_outer_join">Full Join</a>
+   * @param left A PTable to perform an full join on.
+   * @param right A PTable to perform an full join on.
+   * @param <K> Type of the keys.
+   * @param <U> Type of the first {@link PTable}'s values
+   * @param <V> Type of the second {@link PTable}'s values
+   * @return The joined result.
+   */
+  public static <K, U, V> PTable<K, Pair<U, V>> fullJoin(PTable<K, U> left, PTable<K, V> right) {
+    return join(left, right, new FullOuterJoinFn<K, U, V>(left.getValueType()));
+  }
+
+  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right,
+      JoinFn<K, U, V> joinFn) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+    PTableType<K, Pair<U, V>> ret = ptf.tableOf(left.getKeyType(),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
+  }
+
+  private static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(
+      PTable<K, U> left, PTable<K, V> right) {
+    PTypeFamily ptf = left.getTypeFamily();
+    PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
+        ptf.pairs(left.getValueType(), right.getValueType()));
+
+    PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
+        new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
+        return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
+      }
+    }, ptt);
+    PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
+        new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
+        return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
+      }
+    }, ptt);
+    
+    GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
+    optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
+    
+    return (tag1.union(tag2)).groupByKey(optionsBuilder.build());	
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/PTables.java b/src/main/java/org/apache/crunch/lib/PTables.java
new file mode 100644
index 0000000..911cb36
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/PTables.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.List;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import com.google.common.collect.Lists;
+
+/**
+ * Methods for performing common operations on PTables.
+ * 
+ */
+public class PTables {
+
+  public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<K> emitter) {
+        emitter.emit(input.first());
+      }
+    }, ptable.getKeyType());
+  }
+
+  public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
+    return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
+      @Override
+      public void process(Pair<K, V> input, Emitter<V> emitter) {
+        emitter.emit(input.second());
+      }
+    }, ptable.getValueType());
+  }
+
+  /**
+   * Create a detached value for a table {@link Pair}.
+   * 
+   * @param tableType
+   *          The table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, V> getDetachedValue(PTableType<K, V> tableType, Pair<K, V> value) {
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType()
+        .getDetachedValue(value.second()));
+  }
+
+  /**
+   * Created a detached value for a {@link PGroupedTable} value.
+   * 
+   * 
+   * @param groupedTableType
+   *          The grouped table type
+   * @param value
+   *          The value from which a detached value is to be created
+   * @return The detached value
+   * @see PType#getDetachedValue(Object)
+   */
+  public static <K, V> Pair<K, Iterable<V>> getGroupedDetachedValue(
+      PGroupedTableType<K, V> groupedTableType, Pair<K, Iterable<V>> value) {
+
+    PTableType<K, V> tableType = groupedTableType.getTableType();
+    List<V> detachedIterable = Lists.newArrayList();
+    PType<V> valueType = tableType.getValueType();
+    for (V v : value.second()) {
+      detachedIterable.add(valueType.getDetachedValue(v));
+    }
+    return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
+        (Iterable<V>) detachedIterable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Sample.java b/src/main/java/org/apache/crunch/lib/Sample.java
new file mode 100644
index 0000000..96d9694
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Sample.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Random;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import com.google.common.base.Preconditions;
+
+public class Sample {
+
+  public static class SamplerFn<S> extends DoFn<S, S> {
+
+    private final long seed;
+    private final double acceptanceProbability;
+    private transient Random r;
+
+    public SamplerFn(long seed, double acceptanceProbability) {
+      Preconditions.checkArgument(0.0 < acceptanceProbability && acceptanceProbability < 1.0);
+      this.seed = seed;
+      this.acceptanceProbability = acceptanceProbability;
+    }
+
+    @Override
+    public void initialize() {
+      r = new Random(seed);
+    }
+
+    @Override
+    public void process(S input, Emitter<S> emitter) {
+      if (r.nextDouble() < acceptanceProbability) {
+        emitter.emit(input);
+      }
+    }
+  }
+  
+  public static <S> PCollection<S> sample(PCollection<S> input, double probability) {
+	return sample(input, System.currentTimeMillis(), probability);
+  }
+  
+  public static <S> PCollection<S> sample(PCollection<S> input, long seed, double probability) {
+    String stageName = String.format("sample(%.2f)", probability);
+	return input.parallelDo(stageName, new SamplerFn<S>(seed, probability), input.getPType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/5accc9ac/src/main/java/org/apache/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/lib/Set.java b/src/main/java/org/apache/crunch/lib/Set.java
new file mode 100644
index 0000000..f915d53
--- /dev/null
+++ b/src/main/java/org/apache/crunch/lib/Set.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import java.util.Collection;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Tuple3;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+/**
+ * Utilities for performing set operations (difference, intersection, etc) on
+ * {@code PCollection} instances.
+ */
+public class Set {
+
+  /**
+   * Compute the set difference between two sets of elements.
+   * 
+   * @return a collection containing elements that are in <code>coll1</code>
+   * but not in <code>coll2</code>
+   */
+  public static <T> PCollection<T> difference(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+  
+  /**
+   * Compute the intersection of two sets of elements.
+   * 
+   * @return a collection containing elements that common to both sets
+   * <code>coll1</code> and <code>coll2</code>
+   */
+  public static <T> PCollection<T> intersection(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
+  }
+  
+  /**
+   * Find the elements that are common to two sets, like the Unix <code>comm</code>
+   * utility. This method returns a {@link PCollection} of {@link Tuple3} objects,
+   * and the position in the tuple that an element appears is determined by
+   * the collections that it is a member of, as follows:
+   * <ol>
+   * <li>elements only in <code>coll1</code>,</li>
+   * <li>elements only in <code>coll2</code>, or</li>
+   * <li>elements in both collections</li>
+   * </ol>
+   * Tuples are otherwise filled with <code>null</code>.
+   * 
+   * @return a collection of {@link Tuple3} objects
+   */
+  public static <T> PCollection<Tuple3<T, T, T>> comm(PCollection<T> coll1,
+      PCollection<T> coll2) {
+    PTypeFamily typeFamily = coll1.getTypeFamily();
+    PType<T> type = coll1.getPType();
+    return Cogroup.cogroup(toTable(coll1), toTable(coll2))
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
+            Tuple3<T, T, T>>() {
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<Tuple3<T, T, T>> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            boolean inFirst = !groups.first().isEmpty();
+            boolean inSecond = !groups.second().isEmpty();
+            T t = input.first();
+            emitter.emit(Tuple3.of(
+                inFirst && !inSecond ? t : null,
+                    !inFirst && inSecond ? t : null,
+                        inFirst && inSecond ? t : null));
+          }
+        }, typeFamily.triples(type, type, type));
+  }
+  
+  private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
+    PTypeFamily typeFamily = coll.getTypeFamily();
+    return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>() {
+      @Override
+      public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
+        emitter.emit(Pair.of(input, Boolean.TRUE));
+      }
+    }, typeFamily.tableOf(coll.getPType(), typeFamily.booleans()));
+  }
+
+}


Mime
View raw message