incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [25/33] CRUNCH-8: Moving the code into multiple Maven modules. Contributed by Matthias Friedrich
Date Wed, 11 Jul 2012 05:14:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
deleted file mode 100644
index 1d18843..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.hadoop.mapreduce.Job;
-
-import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-
-public class WritableGroupedTableType<K, V> extends PGroupedTableType<K, V> {
-
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-  private final Converter converter;
-  
-  public WritableGroupedTableType(WritableTableType<K, V> tableType) {
-    super(tableType);
-    WritableType keyType = (WritableType) tableType.getKeyType();
-    WritableType valueType = (WritableType) tableType.getValueType();
-    this.inputFn =  new PairIterableMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
-    this.outputFn = tableType.getOutputMapFn();
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
-  }
-  
-  @Override
-  public Class<Pair<K, Iterable<V>>> getTypeClass() {
-    return (Class<Pair<K, Iterable<V>>>) Pair.of(null, null).getClass();  
-  }
-  
-  @Override
-  public Converter getGroupingConverter() {
-    return converter;
-  }
-
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-  
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-  
-  @Override
-  public Pair<K, Iterable<V>> getDetachedValue(Pair<K, Iterable<V>> value) {
-    return PTables.getGroupedDetachedValue(this, value);
-  }
-
-  @Override
-  public void configureShuffle(Job job, GroupingOptions options) {
-    if (options != null) {
-      options.configure(job);
-    }
-    WritableType keyType = (WritableType) tableType.getKeyType();
-    WritableType valueType = (WritableType) tableType.getValueType();
-    job.setMapOutputKeyClass(keyType.getSerializationClass());
-    job.setMapOutputValueClass(valueType.getSerializationClass());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java b/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
deleted file mode 100644
index ba64b0b..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritablePairConverter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.types.Converter;
-
-public class WritablePairConverter<K, V> implements Converter<K, V, Pair<K, V>, Pair<K, Iterable<V>>> {
-  
-  private final Class<K> keyClass;
-  private final Class<V> valueClass;
-  
-  public WritablePairConverter(Class<K> keyClass, Class<V> valueClass) {
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-  }
-  
-  @Override
-  public Pair<K, V> convertInput(K key, V value) {
-    return Pair.of(key, value);
-  }
-
-  @Override
-  public K outputKey(Pair<K, V> value) {
-    return value.first();
-  }
-
-  @Override
-  public V outputValue(Pair<K, V> value) {
-    return value.second();
-  }
-
-  @Override
-  public Class<K> getKeyClass() {
-    return keyClass;
-  }
-
-  @Override
-  public Class<V> getValueClass() {
-    return valueClass;
-  }
-
-  @Override
-  public Pair<K, Iterable<V>> convertIterableInput(K key, Iterable<V> value) {
-    return Pair.of(key, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
deleted file mode 100644
index 9c781f1..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritableTableType.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.PairMapFn;
-import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
-import org.apache.crunch.lib.PTables;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.common.collect.ImmutableList;
-
-class WritableTableType<K, V> implements PTableType<K, V> {
-
-  private final WritableType<K, Writable> keyType;
-  private final WritableType<V, Writable> valueType;
-  private final MapFn inputFn;
-  private final MapFn outputFn;
-  private final Converter converter;
-  
-  public WritableTableType(WritableType<K, Writable> keyType,
-      WritableType<V, Writable> valueType) {
-    this.keyType = keyType;
-    this.valueType = valueType;
-    this.inputFn = new PairMapFn(keyType.getInputMapFn(),
-        valueType.getInputMapFn());
-    this.outputFn = new PairMapFn(keyType.getOutputMapFn(),
-        valueType.getOutputMapFn());
-    this.converter = new WritablePairConverter(keyType.getSerializationClass(),
-        valueType.getSerializationClass());
-  }
-
-  @Override
-  public Class<Pair<K, V>> getTypeClass() {
-    return (Class<Pair<K, V>>) Pair.of(null, null).getClass();
-  }
-  
-  @Override
-  public List<PType> getSubTypes() {
-    return ImmutableList.<PType>of(keyType, valueType);
-  }
-  
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-  
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-  
-  @Override
-  public Converter getConverter() {
-    return converter;
-  }
-  
-  @Override
-  public PTypeFamily getFamily() {
-    return WritableTypeFamily.getInstance();
-  }
-
-  public PType<K> getKeyType() {
-    return keyType;
-  }
-
-  public PType<V> getValueType() {
-    return valueType;
-  }
-
-  @Override
-  public PGroupedTableType<K, V> getGroupedTableType() {
-    return new WritableGroupedTableType<K, V>(this);
-  }
-
-  @Override
-  public SourceTarget<Pair<K, V>> getDefaultFileSource(Path path) {
-    return new SeqFileTableSourceTarget<K, V>(path, this);
-  }
-  
-  @Override
-  public Pair<K, V> getDetachedValue(Pair<K, V> value) {
-    return PTables.getDetachedValue(this, value);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-	if (obj == null || !(obj instanceof WritableTableType)) {
-	  return false;
-	}
-	WritableTableType that = (WritableTableType) obj;
-	return keyType.equals(that.keyType) && valueType.equals(that.valueType);
-  }
-  
-  @Override
-  public int hashCode() {
-	HashCodeBuilder hcb = new HashCodeBuilder();
-	return hcb.append(keyType).append(valueType).toHashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableType.java b/src/main/java/org/apache/crunch/types/writable/WritableType.java
deleted file mode 100644
index b99050d..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritableType.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.util.List;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.seq.SeqFileSourceTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.common.collect.ImmutableList;
-
-public class WritableType<T, W extends Writable> implements PType<T> {
-
-  private final Class<T> typeClass;
-  private final Class<W> writableClass;
-  private final Converter converter;
-  private final MapFn<W, T> inputFn;
-  private final MapFn<T, W> outputFn;
-  private final List<PType> subTypes;
-  
-  WritableType(Class<T> typeClass, Class<W> writableClass,
-      MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn, PType...subTypes) {
-    this.typeClass = typeClass;
-    this.writableClass = writableClass;
-    this.inputFn = inputDoFn;
-    this.outputFn = outputDoFn;
-    this.converter = new WritableValueConverter(writableClass);
-    this.subTypes = ImmutableList.<PType>builder().add(subTypes).build();
-  }
-
-  @Override
-  public PTypeFamily getFamily() {
-    return WritableTypeFamily.getInstance();
-  }
-
-  @Override
-  public Class<T> getTypeClass() {
-    return typeClass;
-  }
-
-  @Override
-  public Converter getConverter() {
-    return converter;
-  }
-  
-  @Override
-  public MapFn getInputMapFn() {
-    return inputFn;
-  }
-  
-  @Override
-  public MapFn getOutputMapFn() {
-    return outputFn;
-  }
-  
-  @Override
-  public List<PType> getSubTypes() {
-    return subTypes;
-  }
-  
-  public Class<W> getSerializationClass() {
-    return writableClass;
-  }
-
-  @Override
-  public SourceTarget<T> getDefaultFileSource(Path path) {
-    return new SeqFileSourceTarget<T>(path, this);
-  }
-  
-  @Override
-  public boolean equals(Object obj) {
-	if (obj == null || !(obj instanceof WritableType)) {
-	  return false;
-	}
-	WritableType wt = (WritableType) obj;
-	return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) &&	
-		subTypes.equals(wt.subTypes));
-  }
-  
-  // Unchecked warnings are suppressed because we know that W and T are the same
-  // type (due to the IdentityFn being used)
-  @SuppressWarnings("unchecked")
-  @Override
-  public T getDetachedValue(T value) {
-    if (this.inputFn.getClass().equals(IdentityFn.class)) {
-      W writableValue = (W) value;
-      return (T) Writables.deepCopy(writableValue, this.writableClass);
-    } else {
-      return value;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-	HashCodeBuilder hcb = new HashCodeBuilder();
-	hcb.append(typeClass).append(writableClass).append(subTypes);
-	return hcb.toHashCode();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
deleted file mode 100644
index 6ff33e9..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.hadoop.io.Writable;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.types.PGroupedTableType;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.PTypeUtils;
-
-/**
- * The {@link Writable}-based implementation of the {@link org.apache.crunch.types.PTypeFamily}
- * interface.
- */
-public class WritableTypeFamily implements PTypeFamily {
-
-  private static final WritableTypeFamily INSTANCE = new WritableTypeFamily();
-
-  public static WritableTypeFamily getInstance() {
-    return INSTANCE;
-  }
-
-  // Disallow construction
-  private WritableTypeFamily() {
-  }
-
-  public PType<Void> nulls() {
-    return Writables.nulls();
-  }
-
-  public PType<String> strings() {
-    return Writables.strings();
-  }
-
-  public PType<Long> longs() {
-    return Writables.longs();
-  }
-
-  public PType<Integer> ints() {
-    return Writables.ints();
-  }
-
-  public PType<Float> floats() {
-    return Writables.floats();
-  }
-
-  public PType<Double> doubles() {
-    return Writables.doubles();
-  }
-
-  public PType<Boolean> booleans() {
-    return Writables.booleans();
-  }
-  
-  public PType<ByteBuffer> bytes() {
-    return Writables.bytes();
-  }
-  
-  public <T> PType<T> records(Class<T> clazz) {
-    return Writables.records(clazz);
-  }
-
-  public <W extends Writable> PType<W> writables(Class<W> clazz) {
-    return Writables.writables(clazz);
-  }
-
-  public <K, V> PTableType<K, V> tableOf(PType<K> key, PType<V> value) {
-    return Writables.tableOf(key, value);
-  }
-
-  public <V1, V2> PType<Pair<V1, V2>> pairs(PType<V1> p1, PType<V2> p2) {
-    return Writables.pairs(p1, p2);
-  }
-
-  public <V1, V2, V3> PType<Tuple3<V1, V2, V3>> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
-    return Writables.triples(p1, p2, p3);
-  }
-
-  public <V1, V2, V3, V4> PType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    return Writables.quads(p1, p2, p3, p4);
-  }
-
-  public PType<TupleN> tuples(PType<?>... ptypes) {
-    return Writables.tuples(ptypes);
-  }
-
-  public <T> PType<Collection<T>> collections(PType<T> ptype) {
-    return Writables.collections(ptype);
-  }
-
-  public <T> PType<Map<String, T>> maps(PType<T> ptype) {
-	return Writables.maps(ptype);
-  }
-  
-  @Override
-  public <T> PType<T> as(PType<T> ptype) {
-    if (ptype instanceof WritableType || ptype instanceof WritableTableType ||
-        ptype instanceof WritableGroupedTableType) {
-      return ptype;
-    }
-    if (ptype instanceof PGroupedTableType) {
-      PTableType ptt = ((PGroupedTableType) ptype).getTableType();
-      return new WritableGroupedTableType((WritableTableType) as(ptt));
-    }
-    PType<T> prim = Writables.getPrimitiveType(ptype.getTypeClass());
-    if (prim != null) {
-      return prim;
-    }
-    return PTypeUtils.convert(ptype, this);
-  }
-
-  @Override
-  public <T extends Tuple> PType<T> tuples(Class<T> clazz, PType<?>... ptypes) {
-    return Writables.tuples(clazz, ptypes);
-  }
-
-  @Override
-  public <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn,
-      MapFn<T, S> outputFn, PType<S> base) {
-    return Writables.derived(clazz, inputFn, outputFn, base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java b/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
deleted file mode 100644
index 85cadc8..0000000
--- a/src/main/java/org/apache/crunch/types/writable/WritableValueConverter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import org.apache.hadoop.io.NullWritable;
-
-import org.apache.crunch.types.Converter;
-
-class WritableValueConverter<W> implements Converter<Object, W, W, Iterable<W>> {
-  
-  private final Class<W> serializationClass;
-  
-  public WritableValueConverter(Class<W> serializationClass) {
-    this.serializationClass = serializationClass;
-  }
-  
-  @Override
-  public W convertInput(Object key, W value) {
-    return value;
-  }
-
-  @Override
-  public Object outputKey(W value) {
-    return NullWritable.get();
-  }
-
-  @Override
-  public W outputValue(W value) {
-    return value;
-  }
-
-  @Override
-  public Class<Object> getKeyClass() {
-    return (Class<Object>) (Class<?>) NullWritable.class;
-  }
-
-  @Override
-  public Class<W> getValueClass() {
-    return serializationClass;
-  }
-
-  @Override
-  public Iterable<W> convertIterableInput(Object key, Iterable<W> value) {
-    return value;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/types/writable/Writables.java b/src/main/java/org/apache/crunch/types/writable/Writables.java
deleted file mode 100644
index 08b6c64..0000000
--- a/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ /dev/null
@@ -1,634 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.types.writable;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import org.apache.crunch.fn.CompositeMapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.TupleFactory;
-import org.apache.crunch.util.PTypes;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Defines static methods that are analogous to the methods defined in
- * {@link WritableTypeFamily} for convenient static importing.
- * 
- */
-public class Writables {
-  private static final MapFn<NullWritable, Void> NULL_WRITABLE_TO_VOID = new MapFn<NullWritable, Void>() {
-    @Override
-    public Void map(NullWritable input) {
-      return null;
-    }
-  };
-
-  private static final MapFn<Void, NullWritable> VOID_TO_NULL_WRITABLE = new MapFn<Void, NullWritable>() {
-    @Override
-    public NullWritable map(Void input) {
-      return NullWritable.get();
-    }
-  };
-  
-  private static final MapFn<Text, String> TEXT_TO_STRING = new MapFn<Text, String>() {
-    @Override
-    public String map(Text input) {
-      return input.toString();
-    }
-  };
-
-  private static final MapFn<String, Text> STRING_TO_TEXT = new MapFn<String, Text>() {
-    @Override
-    public Text map(String input) {
-      return new Text(input);
-    }
-  };
-
-  private static final MapFn<IntWritable, Integer> IW_TO_INT = new MapFn<IntWritable, Integer>() {
-    @Override
-    public Integer map(IntWritable input) {
-      return input.get();
-    }
-  };
-
-  private static final MapFn<Integer, IntWritable> INT_TO_IW = new MapFn<Integer, IntWritable>() {
-    @Override
-    public IntWritable map(Integer input) {
-      return new IntWritable(input);
-    }
-  };
-
-  private static final MapFn<LongWritable, Long> LW_TO_LONG = new MapFn<LongWritable, Long>() {
-    @Override
-    public Long map(LongWritable input) {
-      return input.get();
-    }
-  };
-
-  private static final MapFn<Long, LongWritable> LONG_TO_LW = new MapFn<Long, LongWritable>() {
-    @Override
-    public LongWritable map(Long input) {
-      return new LongWritable(input);
-    }
-  };
-
-  private static final MapFn<FloatWritable, Float> FW_TO_FLOAT = new MapFn<FloatWritable, Float>() {
-    @Override
-    public Float map(FloatWritable input) {
-      return input.get();
-    }
-  };
-
-  private static final MapFn<Float, FloatWritable> FLOAT_TO_FW = new MapFn<Float, FloatWritable>() {
-    @Override
-    public FloatWritable map(Float input) {
-      return new FloatWritable(input);
-    }
-  };
-
-  private static final MapFn<DoubleWritable, Double> DW_TO_DOUBLE = new MapFn<DoubleWritable, Double>() {
-    @Override
-    public Double map(DoubleWritable input) {
-      return input.get();
-    }
-  };
-
-  private static final MapFn<Double, DoubleWritable> DOUBLE_TO_DW = new MapFn<Double, DoubleWritable>() {
-    @Override
-    public DoubleWritable map(Double input) {
-      return new DoubleWritable(input);
-    }
-  };
-
-  private static final MapFn<BooleanWritable, Boolean> BW_TO_BOOLEAN = new MapFn<BooleanWritable, Boolean>() {
-    @Override
-    public Boolean map(BooleanWritable input) {
-      return input.get();
-    }
-  };
-
-  private static final BooleanWritable TRUE = new BooleanWritable(true);
-  private static final BooleanWritable FALSE = new BooleanWritable(false);
-  private static final MapFn<Boolean, BooleanWritable> BOOLEAN_TO_BW = new MapFn<Boolean, BooleanWritable>() {
-    @Override
-    public BooleanWritable map(Boolean input) {
-      return input == Boolean.TRUE ? TRUE : FALSE;
-    }
-  };
-  
-  private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new MapFn<BytesWritable, ByteBuffer>() {
-    @Override
-    public ByteBuffer map(BytesWritable input) {
-      return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
-    }
-  };
-  
-  private static final MapFn<ByteBuffer, BytesWritable> BB_TO_BW = new MapFn<ByteBuffer, BytesWritable>() {
-    @Override
-    public BytesWritable map(ByteBuffer input) {
-      BytesWritable bw = new BytesWritable();
-      bw.set(input.array(), input.arrayOffset(), input.limit());
-      return bw;
-    }
-  };
-
-  private static <S, W extends Writable> WritableType<S, W> create(Class<S> typeClass,
-      Class<W> writableClass, MapFn<W, S> inputDoFn, MapFn<S, W> outputDoFn) {
-    return new WritableType<S, W>(typeClass, writableClass, inputDoFn,
-        outputDoFn);
-  }
-
-  private static final WritableType<Void, NullWritable> nulls = create(Void.class, NullWritable.class,
-      NULL_WRITABLE_TO_VOID, VOID_TO_NULL_WRITABLE);
-  private static final WritableType<String, Text> strings = create(String.class, Text.class,
-      TEXT_TO_STRING, STRING_TO_TEXT);
-  private static final WritableType<Long, LongWritable> longs = create(Long.class, LongWritable.class,
-      LW_TO_LONG, LONG_TO_LW);
-  private static final WritableType<Integer, IntWritable> ints = create(Integer.class, IntWritable.class,
-      IW_TO_INT, INT_TO_IW);
-  private static final WritableType<Float, FloatWritable> floats = create(Float.class, FloatWritable.class,
-      FW_TO_FLOAT, FLOAT_TO_FW);
-  private static final WritableType<Double, DoubleWritable> doubles = create(Double.class,
-      DoubleWritable.class, DW_TO_DOUBLE, DOUBLE_TO_DW);
-  private static final WritableType<Boolean, BooleanWritable> booleans = create(Boolean.class,
-      BooleanWritable.class, BW_TO_BOOLEAN, BOOLEAN_TO_BW);
-  private static final WritableType<ByteBuffer, BytesWritable> bytes = create(ByteBuffer.class,
-      BytesWritable.class, BW_TO_BB, BB_TO_BW);
-
-  private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>>builder()
-      .put(String.class, strings)
-      .put(Long.class, longs)
-      .put(Integer.class, ints)
-      .put(Float.class, floats)
-      .put(Double.class, doubles)
-      .put(Boolean.class, booleans)
-      .put(ByteBuffer.class, bytes)
-      .build();
-  
-  private static final Map<Class<?>, WritableType<?, ?>> EXTENSIONS = Maps.newHashMap();
-  
-  public static <T> PType<T> getPrimitiveType(Class<T> clazz) {
-    return (PType<T>) PRIMITIVES.get(clazz);
-  }
-  
-  public static <T> void register(Class<T> clazz, WritableType<T, ? extends Writable> ptype) {
-    EXTENSIONS.put(clazz, ptype);
-  }
-  
-  public static final WritableType<Void, NullWritable> nulls() {
-    return nulls;
-  }
-
-  public static final WritableType<String, Text> strings() {
-    return strings;
-  }
-
-  public static final WritableType<Long, LongWritable> longs() {
-    return longs;
-  }
-
-  public static final WritableType<Integer, IntWritable> ints() {
-    return ints;
-  }
-
-  public static final WritableType<Float, FloatWritable> floats() {
-    return floats;
-  }
-
-  public static final WritableType<Double, DoubleWritable> doubles() {
-    return doubles;
-  }
-
-  public static final WritableType<Boolean, BooleanWritable> booleans() {
-    return booleans;
-  }
-  
-  public static final WritableType<ByteBuffer, BytesWritable> bytes() {
-    return bytes;
-  }
-  
-  public static final <T, W extends Writable> WritableType<T, W> records(Class<T> clazz) {
-    if (EXTENSIONS.containsKey(clazz)) {
-      return (WritableType<T, W>) EXTENSIONS.get(clazz);
-    }
-    return (WritableType<T, W>) writables(clazz.asSubclass(Writable.class));
-  }
-
-  public static <W extends Writable> WritableType<W, W> writables(Class<W> clazz) {
-    MapFn wIdentity = IdentityFn.getInstance();
-    return new WritableType<W, W>(clazz, clazz, wIdentity, wIdentity);
-  }
-
-  public static <K, V> WritableTableType<K, V> tableOf(
-      PType<K> key, PType<V> value) {
-    if (key instanceof WritableTableType) {
-      WritableTableType wtt = (WritableTableType) key;
-      key = pairs(wtt.getKeyType(), wtt.getValueType());
-    } else if (!(key instanceof WritableType)) {
-      throw new IllegalArgumentException("Key type must be of class WritableType");
-    }
-    if (value instanceof WritableTableType) {
-      WritableTableType wtt = (WritableTableType) value;
-      value = pairs(wtt.getKeyType(), wtt.getValueType());
-    } else if (!(value instanceof WritableType)) {
-      throw new IllegalArgumentException("Value type must be of class WritableType");
-    }
-    return new WritableTableType((WritableType) key, (WritableType) value);
-  }
-
-  /**
-   * For mapping from {@link TupleWritable} instances to {@link Tuple}s.
-   * 
-   */
-  private static class TWTupleMapFn extends MapFn<TupleWritable, Tuple> {
-    private final TupleFactory<?> tupleFactory;
-    private final List<MapFn> fns;
-
-    private transient Object[] values;
-
-    public TWTupleMapFn(TupleFactory<?> tupleFactory, PType<?>... ptypes) {
-      this.tupleFactory = tupleFactory;
-      this.fns = Lists.newArrayList();
-      for (PType ptype : ptypes) {
-        fns.add(ptype.getInputMapFn());
-      }
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-    
-    @Override
-    public void initialize() {
-      for (MapFn fn : fns) {
-        fn.setContext(getContext());
-      }
-      // The rest of the methods allocate new
-      // objects each time. However this one
-      // uses Tuple.tuplify which does a copy
-      this.values = new Object[fns.size()];
-      tupleFactory.initialize();
-    }
-
-    @Override
-    public Tuple map(TupleWritable in) {
-      for (int i = 0; i < values.length; i++) {
-        if (in.has(i)) {
-          values[i] = fns.get(i).map(in.get(i));
-        } else {
-          values[i] = null;
-        }
-      }
-      return tupleFactory.makeTuple(values);
-    }
-  }
-
-  /**
-   * For mapping from {@code Tuple}s to {@code TupleWritable}s.
-   * 
-   */
-  private static class TupleTWMapFn extends MapFn<Tuple, TupleWritable> {
-    
-    private transient TupleWritable writable;
-    private transient Writable[] values;
-
-    private final List<MapFn> fns;
-
-    public TupleTWMapFn(PType<?>... ptypes) {
-      this.fns = Lists.newArrayList();
-      for (PType<?> ptype : ptypes) {
-        fns.add(ptype.getOutputMapFn());
-      }
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.configure(conf);
-      }
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      for (MapFn fn : fns) {
-        fn.setConfigurationForTest(conf);
-      }
-    }
-
-    @Override
-    public void initialize() {
-      this.values = new Writable[fns.size()];
-      this.writable = new TupleWritable(values);
-      for (MapFn fn : fns) {
-        fn.setContext(getContext());
-      }
-    }
-
-    @Override
-    public TupleWritable map(Tuple input) {
-      writable.clearWritten();
-      for (int i = 0; i < input.size(); i++) {
-        Object value = input.get(i);
-        if (value != null) {
-          writable.setWritten(i);
-          values[i] = (Writable) fns.get(i).map(value);
-        }
-      }
-      return writable;
-    }
-  }
-
-  public static <V1, V2> WritableType<Pair<V1, V2>, TupleWritable> pairs(PType<V1> p1, PType<V2> p2) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.PAIR, p1, p2);
-    TupleTWMapFn output = new TupleTWMapFn(p1, p2);
-    return new WritableType(Pair.class, TupleWritable.class, input, output, p1, p2);
-  }
-
-  public static <V1, V2, V3> WritableType<Tuple3<V1, V2, V3>, TupleWritable> triples(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE3, p1, p2, p3);
-    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3);
-    return new WritableType(Tuple3.class, TupleWritable.class,
-        input,
-        output,
-        p1, p2, p3);
-  }
-
-  public static <V1, V2, V3, V4> WritableType<Tuple4<V1, V2, V3, V4>, TupleWritable> quads(PType<V1> p1,
-      PType<V2> p2, PType<V3> p3, PType<V4> p4) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLE4, p1, p2, p3, p4);
-    TupleTWMapFn output = new TupleTWMapFn(p1, p2, p3, p4);
-    return new WritableType(Tuple4.class, TupleWritable.class,
-        input,
-        output,
-        p1, p2, p3, p4);
-  }
-
-  public static WritableType<TupleN, TupleWritable> tuples(PType... ptypes) {
-    TWTupleMapFn input = new TWTupleMapFn(TupleFactory.TUPLEN, ptypes);
-    TupleTWMapFn output = new TupleTWMapFn(ptypes);
-    return new WritableType(TupleN.class, TupleWritable.class, input, output, ptypes);
-  }
-
-  public static <T extends Tuple> PType<T> tuples(Class<T> clazz, PType... ptypes) {
-    Class[] typeArgs = new Class[ptypes.length];
-    for (int i = 0; i < typeArgs.length; i++) {
-      typeArgs[i] = ptypes[i].getTypeClass();
-    }
-    TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
-    TWTupleMapFn input = new TWTupleMapFn(factory, ptypes);
-    TupleTWMapFn output = new TupleTWMapFn(ptypes);
-    return new WritableType(clazz, TupleWritable.class, input, output, ptypes);  
-  }
-  
-  public static <S, T> PType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn,
-      PType<S> base) {
-    WritableType<S, ?> wt = (WritableType<S, ?>) base;
-    MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);
-    MapFn output = new CompositeMapFn(outputFn, wt.getOutputMapFn());
-    return new WritableType(clazz, wt.getSerializationClass(), input, output, base.getSubTypes().toArray(new PType[0]));
-  }
-  
-  private static class ArrayCollectionMapFn<T> extends
-      MapFn<GenericArrayWritable, Collection<T>> {
-    private final MapFn<Object, T> mapFn;
-
-    public ArrayCollectionMapFn(MapFn<Object, T> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-    
-    @Override
-    public void initialize() {
-      mapFn.setContext(getContext());   
-    }
-    
-    @Override
-    public Collection<T> map(GenericArrayWritable input) {
-      Collection<T> collection = Lists.newArrayList();
-      for (Writable writable : input.get()) {
-        collection.add(mapFn.map(writable));
-      }
-      return collection;
-    }
-  }
-
-  private static class CollectionArrayMapFn<T> extends
-      MapFn<Collection<T>, GenericArrayWritable> {
-    
-    private final Class<? extends Writable> clazz;
-    private final MapFn<T, Object> mapFn;
-
-    public CollectionArrayMapFn(Class<? extends Writable> clazz,
-        MapFn<T, Object> mapFn) {
-      this.clazz = clazz;
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-    
-    @Override
-    public void initialize() {
-      mapFn.setContext(getContext());   
-    }
-    
-    @Override
-    public GenericArrayWritable map(Collection<T> input) {
-      GenericArrayWritable arrayWritable = new GenericArrayWritable(clazz);
-      Writable[] w = new Writable[input.size()];
-      int index = 0;
-      for (T in : input) {
-        w[index++] = ((Writable) mapFn.map(in));
-      }
-      arrayWritable.set(w);
-      return arrayWritable;
-    }
-  }
-
-  public static <T> WritableType<Collection<T>, GenericArrayWritable<T>> collections(PType<T> ptype) {
-    WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Collection.class, GenericArrayWritable.class,
-        new ArrayCollectionMapFn(wt.getInputMapFn()), new CollectionArrayMapFn(
-            wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
-  }
-
-  private static class MapInputMapFn<T> extends MapFn<TextMapWritable<Writable>, Map<String, T>> {
-    private final MapFn<Writable, T> mapFn;
-
-    public MapInputMapFn(MapFn<Writable, T> mapFn) {
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      mapFn.setContext(getContext());
-    }
-
-    @Override
-    public Map<String, T> map(TextMapWritable<Writable> input) {
-      Map<String, T> out = Maps.newHashMap();
-      for (Map.Entry<Text, Writable> e : input.entrySet()) {
-        out.put(e.getKey().toString(), mapFn.map(e.getValue()));
-      }
-      return out;
-    }
-  }
-  
-  private static class MapOutputMapFn<T> extends MapFn<Map<String, T>, TextMapWritable<Writable>> {
-
-    private final Class<Writable> clazz;
-    private final MapFn<T, Writable> mapFn;
-
-    public MapOutputMapFn(Class<Writable> clazz, MapFn<T, Writable> mapFn) {
-      this.clazz = clazz;
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public void configure(Configuration conf) {
-      mapFn.configure(conf);
-    }
-
-    @Override
-    public void setConfigurationForTest(Configuration conf) {
-      mapFn.setConfigurationForTest(conf);
-    }
-
-    @Override
-    public void initialize() {
-      mapFn.setContext(getContext());
-    }
-
-    @Override
-    public TextMapWritable<Writable> map(Map<String, T> input) {
-      TextMapWritable<Writable> tmw = new TextMapWritable<Writable>(clazz);
-      for (Map.Entry<String, T> e : input.entrySet()) {
-        tmw.put(new Text(e.getKey()), mapFn.map(e.getValue()));
-      }
-      return tmw;
-    }	
-  }
-  
-  public static <T> WritableType<Map<String, T>, MapWritable> maps(PType<T> ptype) {
-	WritableType<T, ?> wt = (WritableType<T, ?>) ptype;
-    return new WritableType(Map.class, TextMapWritable.class,
-        new MapInputMapFn(wt.getInputMapFn()),
-        new MapOutputMapFn(wt.getSerializationClass(), wt.getOutputMapFn()), ptype);
-  }
-  
-  public static <T> PType<T> jsons(Class<T> clazz) {
-    return PTypes.jsonString(clazz, WritableTypeFamily.getInstance());  
-  }
-  
-  /**
-   * Perform a deep copy of a writable value.
-   * 
-   * @param value
-   *          The value to be copied
-   * @param writableClass
-   *          The Writable class of the value to be copied
-   * @return A fully detached deep copy of the input value
-   */
-  public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) {
-    ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
-    DataOutputStream dataOut = new DataOutputStream(byteOutStream);
-    T copiedValue = null;
-    try {
-      value.write(dataOut);
-      dataOut.flush();
-      ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
-      DataInput dataInput = new DataInputStream(byteInStream);
-      copiedValue = writableClass.newInstance();
-      copiedValue.readFields(dataInput);
-    } catch (Exception e) {
-      throw new CrunchRuntimeException("Error while deep copying " + value, e);
-    }
-    return copiedValue;
-  }
-
-  // Not instantiable
-  private Writables() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/util/Collects.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Collects.java b/src/main/java/org/apache/crunch/util/Collects.java
deleted file mode 100644
index f5b07c4..0000000
--- a/src/main/java/org/apache/crunch/util/Collects.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import com.google.common.collect.Lists;
-
-/**
- * Utility functions for returning Collection objects backed by different types
- * of implementations.
- */
-public class Collects {
-  
-  public static <T> Collection<T> newArrayList() {
-    return Lists.newArrayList();
-  }
-
-  public static <T> Collection<T> newArrayList(T...elements) {
-    return Lists.newArrayList(elements);
-  }
-
-  public static <T> Collection<T> newArrayList(Iterable<? extends T> elements) {
-    return Lists.newArrayList(elements);
-  }
-
-  public static <T> Collection<T> newArrayList(Iterator<? extends T> elements) {
-    return Lists.newArrayList(elements);
-  }
-
-  private Collects() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/DistCache.java b/src/main/java/org/apache/crunch/util/DistCache.java
deleted file mode 100644
index 682e8f0..0000000
--- a/src/main/java/org/apache/crunch/util/DistCache.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.util.Enumeration;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-
-/**
- * Provides functions for working with Hadoop's distributed cache. These include:
- * <ul>
- *   <li>
- *     Functions for working with a job-specific distributed cache of objects, like the
- *     serialized runtime nodes in a MapReduce.
- *   </li>
- *   <li>
- *     Functions for adding library jars to the distributed cache, which will be added to the
- *     classpath of MapReduce tasks.
- *   </li>
- * </ul>
- */
-public class DistCache {
-
-  // Configuration key holding the paths of jars to export to the distributed cache.
-  private static final String TMPJARS_KEY = "tmpjars";
-
-  public static void write(Configuration conf, Path path, Object value) throws IOException {
-    ObjectOutputStream oos = new ObjectOutputStream(FileSystem.get(conf).create(path));
-    oos.writeObject(value);
-    oos.close();
-
-    DistributedCache.addCacheFile(path.toUri(), conf);
-  }
-
-  public static Object read(Configuration conf, Path path) throws IOException {
-    URI target = null;
-    for (URI uri : DistributedCache.getCacheFiles(conf)) {
-      if (uri.toString().equals(path.toString())) {
-        target = uri;
-        break;
-      }
-    }
-    Object value = null;
-    if (target != null) {
-      Path targetPath = new Path(target.toString());
-      ObjectInputStream ois = new ObjectInputStream(targetPath.getFileSystem(conf).open(targetPath));
-      try {
-        value = ois.readObject();
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
-      }
-      ois.close();
-    }
-    return value;
-  }
-
-  /**
-   * Adds the specified jar to the distributed cache of jobs using the provided configuration. The
-   * jar will be placed on the classpath of tasks run by the job.
-   *
-   * @param conf The configuration used to add the jar to the distributed cache.
-   * @param jarFile The jar file to add to the distributed cache.
-   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
-   */
-  public static void addJarToDistributedCache(Configuration conf, File jarFile) throws IOException {
-    if (!jarFile.exists()) {
-      throw new IOException("Jar file: " + jarFile.getCanonicalPath() + " does not exist.");
-    }
-    if (!jarFile.getName().endsWith(".jar")) {
-      throw new IllegalArgumentException("File: " + jarFile.getCanonicalPath() + " is not a .jar "
-          + "file.");
-    }
-    // Get a qualified path for the jar.
-    FileSystem fileSystem = FileSystem.getLocal(conf);
-    Path jarPath = new Path(jarFile.getCanonicalPath());
-    String qualifiedPath = jarPath.makeQualified(fileSystem).toString();
-    // Add the jar to the configuration variable.
-    String jarConfiguration = conf.get(TMPJARS_KEY, "");
-    if (!jarConfiguration.isEmpty()) {
-      jarConfiguration += ",";
-    }
-    jarConfiguration += qualifiedPath;
-    conf.set(TMPJARS_KEY, jarConfiguration);
-  }
-
-  /**
-   * Adds the jar at the specified path to the distributed cache of jobs using the provided
-   * configuration. The jar will be placed on the classpath of tasks run by the job.
-   *
-   * @param conf The configuration used to add the jar to the distributed cache.
-   * @param jarFile The path to the jar file to add to the distributed cache.
-   * @throws IOException If the jar file does not exist or there is a problem accessing the file.
-   */
-  public static void addJarToDistributedCache(Configuration conf, String jarFile)
-      throws IOException {
-    addJarToDistributedCache(conf, new File(jarFile));
-  }
-
-  /**
-   * Finds the path to a jar that contains the class provided, if any. There is no guarantee that
-   * the jar returned will be the first on the classpath to contain the file. This method is
-   * basically lifted out of Hadoop's {@link org.apache.hadoop.mapred.JobConf} class.
-   *
-   * @param jarClass The class the jar file should contain.
-   * @return The path to a jar file that contains the class, or <code>null</code> if no such jar
-   *     exists.
-   * @throws IOException If there is a problem searching for the jar file.
-   */
-  public static String findContainingJar(Class jarClass) throws IOException {
-    ClassLoader loader = jarClass.getClassLoader();
-    String classFile = jarClass.getName().replaceAll("\\.", "/") + ".class";
-      for(Enumeration itr = loader.getResources(classFile); itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    return null;
-  }
-
-  /**
-   * Adds all jars under the specified directory to the distributed cache of jobs using the
-   * provided configuration. The jars will be placed on the classpath of tasks run by the job.
-   * This method does not descend into subdirectories when adding jars.
-   *
-   * @param conf The configuration used to add jars to the distributed cache.
-   * @param jarDirectory A directory containing jar files to add to the distributed cache.
-   * @throws IOException If the directory does not exist or there is a problem accessing the
-   *     directory.
-   */
-  public static void addJarDirToDistributedCache(Configuration conf, File jarDirectory)
-      throws IOException {
-    if (!jarDirectory.exists() || !jarDirectory.isDirectory()) {
-      throw new IOException("Jar directory: " + jarDirectory.getCanonicalPath() + " does not "
-        + "exist or is not a directory.");
-    }
-    for (File file : jarDirectory.listFiles()) {
-      if (!file.isDirectory() && file.getName().endsWith(".jar")) {
-        addJarToDistributedCache(conf, file);
-      }
-    }
-  }
-
-  /**
-   * Adds all jars under the directory at the specified path to the distributed cache of jobs
-   * using the provided configuration.  The jars will be placed on the classpath of the tasks
-   * run by the job. This method does not descend into subdirectories when adding jars.
-   *
-   * @param conf The configuration used to add jars to the distributed cache.
-   * @param jarDirectory The path to a directory containing jar files to add to the distributed
-   *     cache.
-   * @throws IOException If the directory does not exist or there is a problem accessing the
-   *     directory.
-   */
-  public static void addJarDirToDistributedCache(Configuration conf, String jarDirectory)
-      throws IOException {
-    addJarDirToDistributedCache(conf, new File(jarDirectory));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/util/PTypes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/PTypes.java b/src/main/java/org/apache/crunch/util/PTypes.java
deleted file mode 100644
index 863b40f..0000000
--- a/src/main/java/org/apache/crunch/util/PTypes.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.smile.SmileFactory;
-
-import org.apache.crunch.MapFn;
-import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-
-/**
- * Utility functions for creating common types of derived PTypes, e.g., for JSON data,
- * protocol buffers, and Thrift records.
- *
- */
-public class PTypes {
-
-  public static PType<BigInteger> bigInt(PTypeFamily typeFamily) {
-    return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes());  
-  }
-  
-  public static <T> PType<T> jsonString(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new JacksonInputMapFn<T>(clazz),
-        new JacksonOutputMapFn<T>(), typeFamily.strings());
-  }
-
-  public static <T> PType<T> smile(Class<T> clazz, PTypeFamily typeFamily) {
-	return typeFamily.derived(clazz, new SmileInputMapFn<T>(clazz),
-	    new SmileOutputMapFn<T>(), typeFamily.bytes());
-  }
-  
-  public static <T extends Message> PType<T> protos(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ProtoInputMapFn<T>(clazz),
-        new ProtoOutputMapFn<T>(), typeFamily.bytes());
-  }
-  
-  public static <T extends TBase> PType<T> thrifts(Class<T> clazz, PTypeFamily typeFamily) {
-    return typeFamily.derived(clazz, new ThriftInputMapFn<T>(clazz),
-        new ThriftOutputMapFn<T>(), typeFamily.bytes());
-  }
-  
-  public static MapFn<ByteBuffer, BigInteger> BYTE_TO_BIGINT = new MapFn<ByteBuffer, BigInteger>() {
-    public BigInteger map(ByteBuffer input) {
-      return input == null ? null : new BigInteger(input.array());
-    }
-  };
-
-  public static MapFn<BigInteger, ByteBuffer> BIGINT_TO_BYTE = new MapFn<BigInteger, ByteBuffer>() {
-    public ByteBuffer map(BigInteger input) {
-      return input == null ? null : ByteBuffer.wrap(input.toByteArray());
-    }
-  };
-  
-  public static class SmileInputMapFn<T> extends MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient ObjectMapper mapper;
-    
-    public SmileInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper(new SmileFactory());
-    }
-    
-	@Override
-	public T map(ByteBuffer input) {
-      try {
-        return mapper.readValue(input.array(), input.position(), input.limit(), clazz);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-	}
-  }
-  
-  public static class SmileOutputMapFn<T> extends MapFn<T, ByteBuffer> {
-    private transient ObjectMapper mapper;
-    
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper(new SmileFactory());
-    }
-    
-    @Override
-    public ByteBuffer map(T input) {
-      try {
-        return ByteBuffer.wrap(mapper.writeValueAsBytes(input));
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-
-  public static class JacksonInputMapFn<T> extends MapFn<String, T> {
-    
-    private final Class<T> clazz;
-    private transient ObjectMapper mapper;
-    
-    public JacksonInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-    
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-    
-    @Override
-    public T map(String input) {
-      try {
-        return mapper.readValue(input, clazz);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    } 
-  }
-  
-  public static class JacksonOutputMapFn<T> extends MapFn<T, String> {
-    
-    private transient ObjectMapper mapper;
-    
-    @Override
-    public void initialize() {
-      this.mapper = new ObjectMapper();
-    }
-    
-    @Override
-    public String map(T input) {
-      try {
-        return mapper.writeValueAsString(input);
-      } catch (Exception e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-  }
-  
-  public static class ProtoInputMapFn<T extends Message> extends MapFn<ByteBuffer, T> {
-    
-    private final Class<T> clazz;
-    private transient T instance;
-    
-    public ProtoInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-    
-    @Override
-    public void initialize() {
-      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
-    }
-    
-    @Override
-    public T map(ByteBuffer bb) {
-      try {
-        return (T) instance.newBuilderForType().mergeFrom(
-            bb.array(), bb.position(), bb.limit()).build();
-      } catch (InvalidProtocolBufferException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }    
-  }
-  
-  public static class ProtoOutputMapFn<T extends Message> extends MapFn<T, ByteBuffer> {
-    
-    public ProtoOutputMapFn() {
-    }
-    
-    @Override
-    public ByteBuffer map(T proto) {
-      return ByteBuffer.wrap(proto.toByteArray());
-    }    
-  }
-
-  public static class ThriftInputMapFn<T extends TBase> extends MapFn<ByteBuffer, T> {
-
-    private final Class<T> clazz;
-    private transient T instance;
-    private transient TDeserializer deserializer;
-    private transient byte[] bytes;
-    
-    public ThriftInputMapFn(Class<T> clazz) {
-      this.clazz = clazz;
-    }
-    
-    @Override
-    public void initialize() {
-      this.instance = ReflectionUtils.newInstance(clazz, getConfiguration());
-      this.deserializer = new TDeserializer(new TBinaryProtocol.Factory());
-      this.bytes = new byte[0];
-    }
-    
-    @Override
-    public T map(ByteBuffer bb) {
-      T next = (T) instance.deepCopy();
-      int len = bb.limit() - bb.position();
-      if (len != bytes.length) {
-        bytes = new byte[len];
-      }
-      System.arraycopy(bb.array(), bb.position(), bytes, 0, len);
-      try {
-        deserializer.deserialize(next, bytes);
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-      return next;
-    }    
-  }
-  
-  public static class ThriftOutputMapFn<T extends TBase> extends MapFn<T, ByteBuffer> {
-
-    private transient TSerializer serializer;
-    
-    public ThriftOutputMapFn() {
-    }
-    
-    @Override
-    public void initialize() {
-      this.serializer = new TSerializer(new TBinaryProtocol.Factory());
-    }
-    
-    @Override
-    public ByteBuffer map(T t) {
-      try {
-        return ByteBuffer.wrap(serializer.serialize(t));
-      } catch (TException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/util/Protos.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Protos.java b/src/main/java/org/apache/crunch/util/Protos.java
deleted file mode 100644
index 2cda492..0000000
--- a/src/main/java/org/apache/crunch/util/Protos.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.MapFn;
-import com.google.common.base.Splitter;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-
-/**
- * Utility functions for working with protocol buffers in Crunch.
- */
-public class Protos {
-
-  public static <M extends Message, K> MapFn<M, K> extractKey(String fieldName) {
-    return new ExtractKeyFn<M, K>(fieldName);
-  }
-  
-  public static <M extends Message> DoFn<String, M> lineParser(String sep, Class<M> msgClass) {
-    return new TextToProtoFn<M>(sep, msgClass);  
-  }
-  
-  public static class ExtractKeyFn<M extends Message, K> extends MapFn<M, K> {
-    
-    private final String fieldName;
-    
-    private transient FieldDescriptor fd;
-    
-    public ExtractKeyFn(String fieldName) {
-      this.fieldName = fieldName;
-    }
-    
-    @Override
-    public K map(M input) {
-      if (input == null) {
-        throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn");
-      } else if (fd == null) {
-        fd = input.getDescriptorForType().findFieldByName(fieldName);
-        if (fd == null) {
-          throw new IllegalStateException(
-              "Could not find field: " + fieldName + " in message: " + input);
-        }
-      }
-      return (K) input.getField(fd);
-    }
-    
-  }
-  
-  public static class TextToProtoFn<M extends Message> extends DoFn<String, M> {
-    
-    private final String sep;
-    private final Class<M> msgClass;
-    
-    private transient M msgInstance;
-    private transient List<FieldDescriptor> fields;
-    private transient Splitter splitter;
-    
-    enum ParseErrors { TOTAL, NUMBER_FORMAT };
-    
-    public TextToProtoFn(String sep, Class<M> msgClass) {
-      this.sep = sep;
-      this.msgClass = msgClass;
-    }
-    
-    @Override
-    public void initialize() {
-      this.msgInstance = ReflectionUtils.newInstance(msgClass, getConfiguration());
-      this.fields = msgInstance.getDescriptorForType().getFields();
-      this.splitter = Splitter.on(sep);
-    }
-
-    @Override
-    public void process(String input, Emitter<M> emitter) {
-      if (input != null && !input.isEmpty()) {
-        Builder b = msgInstance.newBuilderForType();
-        Iterator<String> iter = splitter.split(input).iterator();
-        boolean parseError = false;
-        for (FieldDescriptor fd : fields) {
-          if (iter.hasNext()) {
-            String value = iter.next();
-            if (value != null && !value.isEmpty()) {
-              Object parsedValue = null;
-              try {
-                switch (fd.getJavaType()) {
-                case STRING:
-                  parsedValue = value;
-                  break;
-                case INT:
-                  parsedValue = Integer.valueOf(value);
-                  break;
-                case LONG:
-                  parsedValue = Long.valueOf(value);
-                  break;
-                case FLOAT:
-                  parsedValue = Float.valueOf(value);
-                  break;
-                case DOUBLE:
-                  parsedValue = Double.valueOf(value);
-                  break;
-                case BOOLEAN:
-                  parsedValue = Boolean.valueOf(value);
-                  break;
-                case ENUM:
-                  parsedValue = fd.getEnumType().findValueByName(value);
-                  break;
-                }
-                b.setField(fd, parsedValue);
-              } catch (NumberFormatException nfe) {
-                increment(ParseErrors.NUMBER_FORMAT);
-                parseError = true;
-                break;
-              }
-            }
-          }
-        }
-        
-        if (parseError) {
-          increment(ParseErrors.TOTAL);
-        } else {
-          emitter.emit((M) b.build());
-        }
-      }
-    }
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/crunch/util/Tuples.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/crunch/util/Tuples.java b/src/main/java/org/apache/crunch/util/Tuples.java
deleted file mode 100644
index b8eb3b9..0000000
--- a/src/main/java/org/apache/crunch/util/Tuples.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.util;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.Tuple3;
-import org.apache.crunch.Tuple4;
-import org.apache.crunch.TupleN;
-import com.google.common.collect.Lists;
-import com.google.common.collect.UnmodifiableIterator;
-
-/**
- * Utilities for working with subclasses of the {@code Tuple} interface.
- *
- */
-public class Tuples {
-
-  private static abstract class TuplifyIterator<T> extends UnmodifiableIterator<T> {
-    protected List<Iterator<?>> iterators;
-    
-    public TuplifyIterator(Iterator<?>...iterators) {
-      this.iterators = Lists.newArrayList(iterators);
-    }
-    
-    @Override
-    public boolean hasNext() {
-      for (Iterator<?> iter : iterators) {
-        if (!iter.hasNext()) {
-          return false;
-        }
-      }
-      return true;
-    }
-    
-    protected Object next(int index) {
-      return iterators.get(index).next();
-    }
-  }
-  
-  public static class PairIterable<S, T> implements Iterable<Pair<S, T>> {
-    private final Iterable<S> first;
-    private final Iterable<T> second;
-    
-    public PairIterable(Iterable<S> first, Iterable<T> second) {
-      this.first = first;
-      this.second = second;
-    }
-    
-    @Override
-    public Iterator<Pair<S, T>> iterator() {
-      return new TuplifyIterator<Pair<S, T>>(first.iterator(), second.iterator()) {
-        @Override
-        public Pair<S, T> next() {
-          return Pair.of((S) next(0), (T) next(1));
-        }
-      };
-    }   
-  }
-  
-  public static class TripIterable<A, B, C> implements Iterable<Tuple3<A, B, C>> {
-    private final Iterable<A> first;
-    private final Iterable<B> second;
-    private final Iterable<C> third;
-    
-    public TripIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third) {
-      this.first = first;
-      this.second = second;
-      this.third = third;
-    }
-    
-    @Override
-    public Iterator<Tuple3<A, B, C>> iterator() {
-      return new TuplifyIterator<Tuple3<A, B, C>>(first.iterator(), second.iterator(),
-          third.iterator()) {
-        @Override
-        public Tuple3<A, B, C> next() {
-          return new Tuple3<A, B, C>((A) next(0), (B) next(1), (C) next(2));
-        }
-      };
-    }   
-  }
-  
-  public static class QuadIterable<A, B, C, D> implements Iterable<Tuple4<A, B, C, D>> {
-    private final Iterable<A> first;
-    private final Iterable<B> second;
-    private final Iterable<C> third;
-    private final Iterable<D> fourth;
-    
-    public QuadIterable(Iterable<A> first, Iterable<B> second, Iterable<C> third,
-        Iterable<D> fourth) {
-      this.first = first;
-      this.second = second;
-      this.third = third;
-      this.fourth = fourth;
-    }
-    
-    @Override
-    public Iterator<Tuple4<A, B, C, D>> iterator() {
-      return new TuplifyIterator<Tuple4<A, B, C, D>>(first.iterator(), second.iterator(),
-          third.iterator(), fourth.iterator()) {
-        @Override
-        public Tuple4<A, B, C, D> next() {
-          return new Tuple4<A, B, C, D>((A) next(0), (B) next(1), (C) next(2), (D) next(3));
-        }
-      };
-    }   
-  }
-  
-  public static class TupleNIterable implements Iterable<TupleN> {
-    private final Iterator<?>[] iters;
-    
-    public TupleNIterable(Iterable<?>... iterables) {
-      this.iters = new Iterator[iterables.length];
-      for (int i = 0; i < iters.length; i++) {
-        iters[i] = iterables[i].iterator();
-      }
-    }
-    
-    @Override
-    public Iterator<TupleN> iterator() {
-      return new TuplifyIterator<TupleN>(iters) {
-        @Override
-        public TupleN next() {
-          Object[] values = new Object[iters.length];
-          for (int i = 0; i < values.length; i++) {
-            values[i] = next(i);
-          }
-          return new TupleN(values);
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/83acb813/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
deleted file mode 100644
index 6756dbb..0000000
--- a/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapreduce.lib.jobcontrol;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * This class encapsulates a MapReduce job and its dependency. It monitors the
- * states of the depending jobs and updates the state of this job. A job starts
- * in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCESS state, then the job state will become READY. If
- * any depending jobs fail, the job will fail too. When in READY state, the job
- * can be submitted to Hadoop for execution, with the state changing into
- * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED
- * state, depending the status of the job execution.
- */
-public class CrunchControlledJob {
-
-  // A job will be in one of the following states
-  public static enum State {
-    SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED
-  };
-
-  public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
-  protected State state;
-  protected Job job; // mapreduce job to be executed.
-  // some info for human consumption, e.g. the reason why the job failed
-  protected String message;
-  private String controlID; // assigned and used by JobControl class
-  // the jobs the current job depends on
-  private List<CrunchControlledJob> dependingJobs;
-
-  /**
-   * Construct a job.
-   * 
-   * @param job
-   *          a mapreduce job to be executed.
-   * @param dependingJobs
-   *          an array of jobs the current job depends on
-   */
-  public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs)
-      throws IOException {
-    this.job = job;
-    this.dependingJobs = dependingJobs;
-    this.state = State.WAITING;
-    this.controlID = "unassigned";
-    this.message = "just initialized";
-  }
-
-  /**
-   * Construct a job.
-   * 
-   * @param conf
-   *          mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public CrunchControlledJob(Configuration conf) throws IOException {
-    this(new Job(conf), null);
-  }
-
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("job name:\t").append(this.job.getJobName()).append("\n");
-    sb.append("job id:\t").append(this.controlID).append("\n");
-    sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
-    sb.append("job message:\t").append(this.message).append("\n");
-
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      sb.append("job has no depending job:\t").append("\n");
-    } else {
-      sb.append("job has ").append(this.dependingJobs.size())
-          .append(" dependeng jobs:\n");
-      for (int i = 0; i < this.dependingJobs.size(); i++) {
-        sb.append("\t depending job ").append(i).append(":\t");
-        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
-      }
-    }
-    return sb.toString();
-  }
-
-  /**
-   * @return the job name of this job
-   */
-  public String getJobName() {
-    return job.getJobName();
-  }
-
-  /**
-   * Set the job name for this job.
-   * 
-   * @param jobName
-   *          the job name
-   */
-  public void setJobName(String jobName) {
-    job.setJobName(jobName);
-  }
-
-  /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.controlID;
-  }
-
-  /**
-   * Set the job ID for this job.
-   * 
-   * @param id
-   *          the job ID
-   */
-  public void setJobID(String id) {
-    this.controlID = id;
-  }
-
-  /**
-   * @return the mapred ID of this job as assigned by the mapred framework.
-   */
-  public JobID getMapredJobID() {
-    return this.job.getJobID();
-  }
-
-  /**
-   * @return the mapreduce job
-   */
-  public synchronized Job getJob() {
-    return this.job;
-  }
-
-  /**
-   * Set the mapreduce job
-   * 
-   * @param job
-   *          the mapreduce job for this job.
-   */
-  public synchronized void setJob(Job job) {
-    this.job = job;
-  }
-
-  /**
-   * @return the state of this job
-   */
-  public synchronized State getJobState() {
-    return this.state;
-  }
-
-  /**
-   * Set the state for this job.
-   * 
-   * @param state
-   *          the new state for this job.
-   */
-  protected synchronized void setJobState(State state) {
-    this.state = state;
-  }
-
-  /**
-   * @return the message of this job
-   */
-  public synchronized String getMessage() {
-    return this.message;
-  }
-
-  /**
-   * Set the message for this job.
-   * 
-   * @param message
-   *          the message for this job.
-   */
-  public synchronized void setMessage(String message) {
-    this.message = message;
-  }
-
-  /**
-   * @return the depending jobs of this job
-   */
-  public List<CrunchControlledJob> getDependentJobs() {
-    return this.dependingJobs;
-  }
-
-  /**
-   * Add a job to this jobs' dependency list. Dependent jobs can only be added
-   * while a Job is waiting to run, not during or afterwards.
-   * 
-   * @param dependingJob
-   *          Job that this Job depends on.
-   * @return <tt>true</tt> if the Job was added.
-   */
-  public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) {
-    if (this.state == State.WAITING) { // only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<CrunchControlledJob>();
-      }
-      return this.dependingJobs.add(dependingJob);
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * @return true if this job is in a complete state
-   */
-  public synchronized boolean isCompleted() {
-    return this.state == State.FAILED || this.state == State.DEPENDENT_FAILED
-        || this.state == State.SUCCESS;
-  }
-
-  /**
-   * @return true if this job is in READY state
-   */
-  public synchronized boolean isReady() {
-    return this.state == State.READY;
-  }
-
-  public void killJob() throws IOException, InterruptedException {
-    job.killJob();
-  }
-
-  /**
-   * Check the state of this running job. The state may remain the same, become
-   * SUCCESS or FAILED.
-   */
-  protected void checkRunningState() throws IOException, InterruptedException {
-    try {
-      if (job.isComplete()) {
-        if (job.isSuccessful()) {
-          this.state = State.SUCCESS;
-        } else {
-          this.state = State.FAILED;
-          this.message = "Job failed!";
-        }
-      }
-    } catch (IOException ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (job != null) {
-          job.killJob();
-        }
-      } catch (IOException e) {
-      }
-    }
-  }
-
-  /**
-   * Check and update the state of this job. The state changes depending on its
-   * current state and the states of the depending jobs.
-   */
-  synchronized State checkState() throws IOException, InterruptedException {
-    if (this.state == State.RUNNING) {
-      checkRunningState();
-    }
-    if (this.state != State.WAITING) {
-      return this.state;
-    }
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      this.state = State.READY;
-      return this.state;
-    }
-    CrunchControlledJob pred = null;
-    int n = this.dependingJobs.size();
-    for (int i = 0; i < n; i++) {
-      pred = this.dependingJobs.get(i);
-      State s = pred.checkState();
-      if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
-        break; // a pred is still not completed, continue in WAITING
-        // state
-      }
-      if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
-        this.state = State.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID " + pred.getJobID()
-            + " failed. " + pred.getMessage();
-        break;
-      }
-      // pred must be in success state
-      if (i == n - 1) {
-        this.state = State.READY;
-      }
-    }
-
-    return this.state;
-  }
-
-  /**
-   * Submit this job to mapred. The state becomes RUNNING if submission is
-   * successful, FAILED otherwise.
-   */
-  protected synchronized void submit() {
-    try {
-      Configuration conf = job.getConfiguration();
-      if (conf.getBoolean(CREATE_DIR, false)) {
-        FileSystem fs = FileSystem.get(conf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(job);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
-            try {
-              fs.mkdirs(inputPaths[i]);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
-      job.submit();
-      this.state = State.RUNNING;
-    } catch (Exception ioe) {
-      this.state = State.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-    }
-  }
-
-}


Mime
View raw message