Return-Path: X-Original-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 696A79BFE for ; Mon, 8 Oct 2012 15:15:02 +0000 (UTC) Received: (qmail 73673 invoked by uid 500); 8 Oct 2012 15:15:02 -0000 Delivered-To: apmail-incubator-crunch-commits-archive@incubator.apache.org Received: (qmail 73641 invoked by uid 500); 8 Oct 2012 15:15:02 -0000 Mailing-List: contact crunch-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-commits@incubator.apache.org Received: (qmail 73631 invoked by uid 99); 8 Oct 2012 15:15:02 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2012 15:15:02 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A058A3C39E; Mon, 8 Oct 2012 15:15:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greid@apache.org To: crunch-commits@incubator.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-90: Initialize deep copier with config Message-Id: <20121008151501.A058A3C39E@tyr.zones.apache.org> Date: Mon, 8 Oct 2012 15:15:01 +0000 (UTC) Updated Branches: refs/heads/master eda3c776b -> 7fef772cf CRUNCH-90: Initialize deep copier with config Update the deepCopy logic to use the current ReflectDataFactory instance for the run. This is done by providing the Configuration object in PType#initialize Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/7fef772c Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/7fef772c Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/7fef772c Branch: refs/heads/master Commit: 7fef772cf379a640697b175ad2a485439cc19f3a Parents: eda3c77 Author: Josh Wills Authored: Sun Oct 7 17:56:45 2012 -0700 Committer: Gabriel Reid Committed: Mon Oct 8 13:15:07 2012 +0200 ---------------------------------------------------------------------- .../crunch/impl/mr/emit/IntermediateEmitter.java | 11 +- .../java/org/apache/crunch/impl/mr/run/RTNode.java | 2 +- .../main/java/org/apache/crunch/lib/Aggregate.java | 147 ++++++++------- .../main/java/org/apache/crunch/lib/PTables.java | 23 +-- .../apache/crunch/lib/join/FullOuterJoinFn.java | 9 +- .../org/apache/crunch/lib/join/InnerJoinFn.java | 9 +- .../java/org/apache/crunch/lib/join/JoinFn.java | 51 ++--- .../apache/crunch/lib/join/LeftOuterJoinFn.java | 9 +- .../apache/crunch/lib/join/RightOuterJoinFn.java | 9 +- .../apache/crunch/types/CollectionDeepCopier.java | 10 +- .../java/org/apache/crunch/types/DeepCopier.java | 26 ++- .../org/apache/crunch/types/MapDeepCopier.java | 22 ++- .../main/java/org/apache/crunch/types/PType.java | 37 ++-- .../org/apache/crunch/types/TupleDeepCopier.java | 16 +- .../apache/crunch/types/avro/AvroDeepCopier.java | 50 +++--- .../crunch/types/avro/AvroGroupedTableType.java | 4 +- .../apache/crunch/types/avro/AvroOutputFormat.java | 2 +- .../apache/crunch/types/avro/AvroTableType.java | 11 +- .../org/apache/crunch/types/avro/AvroType.java | 19 ++- .../crunch/types/avro/ReflectDataFactory.java | 4 +- .../crunch/types/avro/SafeAvroSerialization.java | 3 +- .../crunch/types/writable/WritableDeepCopier.java | 13 +- .../types/writable/WritableGroupedTableType.java | 8 +- .../crunch/types/writable/WritableTableType.java | 10 +- .../apache/crunch/types/writable/WritableType.java | 12 +- .../impl/mr/emit/IntermediateEmitterTest.java | 7 +- .../org/apache/crunch/lib/join/JoinFnTestBase.java | 2 + .../crunch/types/CollectionDeepCopierTest.java | 8 +- .../org/apache/crunch/types/MapDeepCopierTest.java | 5 +- .../apache/crunch/types/TupleDeepCopierTest.java | 6 +- .../crunch/types/avro/AvroDeepCopierTest.java | 14 +- .../types/avro/AvroGroupedTableTypeTest.java | 6 +- .../crunch/types/avro/AvroTableTypeTest.java | 5 +- .../org/apache/crunch/types/avro/AvroTypeTest.java | 59 ++++--- .../types/writable/WritableDeepCopierTest.java | 17 +- .../writable/WritableGroupedTableTypeTest.java | 3 +- .../types/writable/WritableTableTypeTest.java | 6 +- .../crunch/types/writable/WritableTypeTest.java | 20 +- 38 files changed, 374 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java index d609489..b6df98b 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java @@ -23,25 +23,28 @@ import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.impl.mr.run.RTNode; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; import com.google.common.collect.ImmutableList; /** - * An {@link Emitter} implementation that links the output of one {@link DoFn} - * to the input of another {@code DoFn}. + * An {@link Emitter} implementation that links the output of one {@link DoFn} to the input of + * another {@code DoFn}. * */ public class IntermediateEmitter implements Emitter { private final List children; + private final Configuration conf; private final PType outputPType; private final boolean needDetachedValues; - public IntermediateEmitter(PType outputPType, List children) { + public IntermediateEmitter(PType outputPType, List children, Configuration conf) { this.outputPType = outputPType; this.children = ImmutableList.copyOf(children); + this.conf = conf; - outputPType.initialize(); + outputPType.initialize(conf); needDetachedValues = this.children.size() > 1; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index 4df989b..1f5124c 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -74,7 +74,7 @@ public class RTNode implements Serializable { this.emitter = new OutputEmitter(outputConverter, ctxt.getContext()); } } else if (!children.isEmpty()) { - this.emitter = new IntermediateEmitter(outputPType, children); + this.emitter = new IntermediateEmitter(outputPType, children, ctxt.getContext().getConfiguration()); } else { throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java index dc3de7c..f28cca4 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java @@ -40,15 +40,14 @@ import org.apache.crunch.types.PTypeFamily; import com.google.common.collect.Lists; /** - * Methods for performing various types of aggregations over {@link PCollection} - * instances. - * + * Methods for performing various types of aggregations over {@link PCollection} instances. + * */ public class Aggregate { /** - * Returns a {@code PTable} that contains the unique elements of this - * collection mapped to a count of their occurrences. + * Returns a {@code PTable} that contains the unique elements of this collection mapped to a count + * of their occurrences. */ public static PTable count(PCollection collect) { PTypeFamily tf = collect.getTypeFamily(); @@ -56,24 +55,26 @@ public class Aggregate { public Pair map(S input) { return Pair.of(input, 1L); } - }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey().combineValues(CombineFn. SUM_LONGS()); + }, tf.tableOf(collect.getPType(), tf.longs())).groupByKey() + .combineValues(CombineFn. SUM_LONGS()); } /** * Returns the number of elements in the provided PCollection. - * + * * @param collect The PCollection whose elements should be counted. * @param The type of the PCollection. * @return A {@code PObject} containing the number of elements in the {@code PCollection}. */ public static PObject length(PCollection collect) { PTypeFamily tf = collect.getTypeFamily(); - PTable countTable = collect.parallelDo("Aggregate.count", - new MapFn>() { + PTable countTable = collect + .parallelDo("Aggregate.count", new MapFn>() { public Pair map(S input) { return Pair.of(1, 1L); } - }, tf.tableOf(tf.ints(), tf.longs())).groupByKey().combineValues(CombineFn. SUM_LONGS()); + }, tf.tableOf(tf.ints(), tf.longs())).groupByKey() + .combineValues(CombineFn. SUM_LONGS()); PCollection count = countTable.values(); return new FirstElementPObject(count); } @@ -132,7 +133,8 @@ public class Aggregate { } @Override - public void process(Pair>> input, Emitter>> emitter) { + public void process(Pair>> input, + Emitter>> emitter) { Comparator> cmp = new PairValueComparator(maximize); PriorityQueue> queue = new PriorityQueue>(limit, cmp); for (Pair pair : input.second()) { @@ -155,8 +157,8 @@ public class Aggregate { PTableType base = ptable.getPTableType(); PType> pairType = ptf.pairs(base.getKeyType(), base.getValueType()); PTableType> inter = ptf.tableOf(ptf.ints(), pairType); - return ptable.parallelDo("top" + limit + "map", new TopKFn(limit, maximize), inter).groupByKey(1) - .combineValues(new TopKCombineFn(limit, maximize)) + return ptable.parallelDo("top" + limit + "map", new TopKFn(limit, maximize), inter) + .groupByKey(1).combineValues(new TopKCombineFn(limit, maximize)) .parallelDo("top" + limit + "reduce", new DoFn>, Pair>() { public void process(Pair> input, Emitter> emitter) { emitter.emit(input.second()); @@ -174,31 +176,33 @@ public class Aggregate { + collect.getPType().getTypeClass()); } PTypeFamily tf = collect.getTypeFamily(); - PCollection maxCollect = PTables.values(collect.parallelDo("max", new DoFn>() { - private transient S max = null; + PCollection maxCollect = PTables.values(collect + .parallelDo("max", new DoFn>() { + private transient S max = null; - public void process(S input, Emitter> emitter) { - if (max == null || ((Comparable) max).compareTo(input) < 0) { - max = input; - } - } + public void process(S input, Emitter> emitter) { + if (max == null || ((Comparable) max).compareTo(input) < 0) { + max = input; + } + } - public void cleanup(Emitter> emitter) { - if (max != null) { - emitter.emit(Pair.of(true, max)); - } - } - }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1).combineValues(new CombineFn() { - public void process(Pair> input, Emitter> emitter) { - S max = null; - for (S v : input.second()) { - if (max == null || ((Comparable) max).compareTo(v) < 0) { - max = v; + public void cleanup(Emitter> emitter) { + if (max != null) { + emitter.emit(Pair.of(true, max)); + } } - } - emitter.emit(Pair.of(input.first(), max)); - } - })); + }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey(1) + .combineValues(new CombineFn() { + public void process(Pair> input, Emitter> emitter) { + S max = null; + for (S v : input.second()) { + if (max == null || ((Comparable) max).compareTo(v) < 0) { + max = v; + } + } + emitter.emit(Pair.of(input.first(), max)); + } + })); return new FirstElementPObject(maxCollect); } @@ -212,51 +216,54 @@ public class Aggregate { + collect.getPType().getTypeClass()); } PTypeFamily tf = collect.getTypeFamily(); - PCollection minCollect = PTables.values(collect.parallelDo("min", new DoFn>() { - private transient S min = null; + PCollection minCollect = PTables.values(collect + .parallelDo("min", new DoFn>() { + private transient S min = null; - public void process(S input, Emitter> emitter) { - if (min == null || ((Comparable) min).compareTo(input) > 0) { - min = input; - } - } + public void process(S input, Emitter> emitter) { + if (min == null || ((Comparable) min).compareTo(input) > 0) { + min = input; + } + } - public void cleanup(Emitter> emitter) { - if (min != null) { - emitter.emit(Pair.of(false, min)); - } - } - }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey().combineValues(new CombineFn() { - public void process(Pair> input, Emitter> emitter) { - S min = null; - for (S v : input.second()) { - if (min == null || ((Comparable) min).compareTo(v) > 0) { - min = v; + public void cleanup(Emitter> emitter) { + if (min != null) { + emitter.emit(Pair.of(false, min)); + } } - } - emitter.emit(Pair.of(input.first(), min)); - } - })); + }, tf.tableOf(tf.booleans(), collect.getPType())).groupByKey() + .combineValues(new CombineFn() { + public void process(Pair> input, Emitter> emitter) { + S min = null; + for (S v : input.second()) { + if (min == null || ((Comparable) min).compareTo(v) > 0) { + min = v; + } + } + emitter.emit(Pair.of(input.first(), min)); + } + })); return new FirstElementPObject(minCollect); } public static PTable> collectValues(PTable collect) { PTypeFamily tf = collect.getTypeFamily(); final PType valueType = collect.getValueType(); - return collect.groupByKey().parallelDo("collect", new MapValuesFn, Collection>() { + return collect.groupByKey().parallelDo("collect", + new MapValuesFn, Collection>() { - @Override - public void initialize() { - valueType.initialize(); - } + @Override + public void initialize() { + valueType.initialize(getConfiguration()); + } - public Collection map(Iterable values) { - List collected = Lists.newArrayList(); - for (V value : values) { - collected.add(valueType.getDetachedValue(value)); - } - return collected; - } - }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType()))); + public Collection map(Iterable values) { + List collected = Lists.newArrayList(); + for (V value : values) { + collected.add(valueType.getDetachedValue(value)); + } + return collected; + } + }, tf.tableOf(collect.getKeyType(), tf.collections(collect.getValueType()))); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/PTables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/PTables.java b/crunch/src/main/java/org/apache/crunch/lib/PTables.java index 71c04ff..e788656 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/PTables.java +++ b/crunch/src/main/java/org/apache/crunch/lib/PTables.java @@ -58,31 +58,27 @@ public class PTables { /** * Create a detached value for a table {@link Pair}. * - * @param tableType - * The table type - * @param value - * The value from which a detached value is to be created + * @param tableType The table type + * @param value The value from which a detached value is to be created * @return The detached value * @see PType#getDetachedValue(Object) */ public static Pair getDetachedValue(PTableType tableType, Pair value) { - return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), - tableType.getValueType().getDetachedValue(value.second())); + return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), tableType.getValueType() + .getDetachedValue(value.second())); } /** * Created a detached value for a {@link PGroupedTable} value. * * - * @param groupedTableType - * The grouped table type - * @param value - * The value from which a detached value is to be created + * @param groupedTableType The grouped table type + * @param value The value from which a detached value is to be created * @return The detached value * @see PType#getDetachedValue(Object) */ - public static Pair> getGroupedDetachedValue(PGroupedTableType groupedTableType, - Pair> value) { + public static Pair> getGroupedDetachedValue( + PGroupedTableType groupedTableType, Pair> value) { PTableType tableType = groupedTableType.getTableType(); List detachedIterable = Lists.newArrayList(); @@ -90,6 +86,7 @@ public class PTables { for (V v : value.second()) { detachedIterable.add(valueType.getDetachedValue(v)); } - return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), (Iterable) detachedIterable); + return Pair.of(tableType.getKeyType().getDetachedValue(value.first()), + (Iterable) detachedIterable); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java index 0ceb382..834396a 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java @@ -28,12 +28,9 @@ import com.google.common.collect.Lists; /** * Used to perform the last step of an full outer join. * - * @param - * Type of the keys. - * @param - * Type of the first {@link org.apache.crunch.PTable}'s values - * @param - * Type of the second {@link org.apache.crunch.PTable}'s values + * @param Type of the keys. + * @param Type of the first {@link org.apache.crunch.PTable}'s values + * @param Type of the second {@link org.apache.crunch.PTable}'s values */ public class FullOuterJoinFn extends JoinFn { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java index 5275c95..a3d30d2 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/InnerJoinFn.java @@ -28,12 +28,9 @@ import com.google.common.collect.Lists; /** * Used to perform the last step of an inner join. * - * @param - * Type of the keys. - * @param - * Type of the first {@link org.apache.crunch.PTable}'s values - * @param - * Type of the second {@link org.apache.crunch.PTable}'s values + * @param Type of the keys. + * @param Type of the first {@link org.apache.crunch.PTable}'s values + * @param Type of the second {@link org.apache.crunch.PTable}'s values */ public class InnerJoinFn extends JoinFn { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java index dab6c34..99aea5a 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/JoinFn.java @@ -25,26 +25,22 @@ import org.apache.crunch.types.PType; /** * Represents a {@link org.apache.crunch.DoFn} for performing joins. * - * @param - * Type of the keys. - * @param - * Type of the first {@link org.apache.crunch.PTable}'s values - * @param - * Type of the second {@link org.apache.crunch.PTable}'s values + * @param Type of the keys. + * @param Type of the first {@link org.apache.crunch.PTable}'s values + * @param Type of the second {@link org.apache.crunch.PTable}'s values */ -public abstract class JoinFn extends DoFn, Iterable>>, Pair>> { +public abstract class JoinFn extends + DoFn, Iterable>>, Pair>> { protected PType keyType; protected PType leftValueType; /** - * Instantiate with the PType of the value of the left side of the join (used - * for creating deep copies of values). + * Instantiate with the PType of the value of the left side of the join (used for creating deep + * copies of values). * - * @param keyType - * The PType of the value used as the key of the join - * @param leftValueType - * The PType of the value type of the left side of the join + * @param keyType The PType of the value used as the key of the join + * @param leftValueType The PType of the value type of the left side of the join */ public JoinFn(PType keyType, PType leftValueType) { this.keyType = keyType; @@ -53,8 +49,8 @@ public abstract class JoinFn extends DoFn, Iterab @Override public void initialize() { - this.keyType.initialize(); - this.leftValueType.initialize(); + this.keyType.initialize(getConfiguration()); + this.leftValueType.initialize(getConfiguration()); } /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */ @@ -63,28 +59,23 @@ public abstract class JoinFn extends DoFn, Iterab /** * Performs the actual joining. * - * @param key - * The key for this grouping of values. - * @param id - * The side that this group of values is from (0 -> left, 1 -> - * right). - * @param pairs - * The group of values associated with this key and id pair. - * @param emitter - * The emitter to send the output to. + * @param key The key for this grouping of values. + * @param id The side that this group of values is from (0 -> left, 1 -> right). + * @param pairs The group of values associated with this key and id pair. + * @param emitter The emitter to send the output to. */ - public abstract void join(K key, int id, Iterable> pairs, Emitter>> emitter); + public abstract void join(K key, int id, Iterable> pairs, + Emitter>> emitter); /** * Split up the input record to make coding a bit more manageable. * - * @param input - * The input record. - * @param emitter - * The emitter to send the output to. + * @param input The input record. + * @param emitter The emitter to send the output to. */ @Override - public void process(Pair, Iterable>> input, Emitter>> emitter) { + public void process(Pair, Iterable>> input, + Emitter>> emitter) { join(input.first().first(), input.first().second(), input.second(), emitter); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java index 116353a..18288a4 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java @@ -28,12 +28,9 @@ import com.google.common.collect.Lists; /** * Used to perform the last step of an left outer join. * - * @param - * Type of the keys. - * @param - * Type of the first {@link org.apache.crunch.PTable}'s values - * @param - * Type of the second {@link org.apache.crunch.PTable}'s values + * @param Type of the keys. + * @param Type of the first {@link org.apache.crunch.PTable}'s values + * @param Type of the second {@link org.apache.crunch.PTable}'s values */ public class LeftOuterJoinFn extends JoinFn { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java index 51b74cc..2789d40 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/RightOuterJoinFn.java @@ -28,12 +28,9 @@ import com.google.common.collect.Lists; /** * Used to perform the last step of an right outer join. * - * @param - * Type of the keys. - * @param - * Type of the first {@link org.apache.crunch.PTable}'s values - * @param - * Type of the second {@link org.apache.crunch.PTable}'s values + * @param Type of the keys. + * @param Type of the first {@link org.apache.crunch.PTable}'s values + * @param Type of the second {@link org.apache.crunch.PTable}'s values */ public class RightOuterJoinFn extends JoinFn { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java index 2216ef7..db62e85 100644 --- a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java @@ -20,13 +20,14 @@ package org.apache.crunch.types; import java.util.Collection; import java.util.List; +import org.apache.hadoop.conf.Configuration; + import com.google.common.collect.Lists; /** * Performs deep copies (based on underlying PType deep copying) of Collections. * - * @param - * The type of Tuple implementation being copied + * @param The type of Tuple implementation being copied */ public class CollectionDeepCopier implements DeepCopier> { @@ -37,6 +38,11 @@ public class CollectionDeepCopier implements DeepCopier> { } @Override + public void initialize(Configuration conf) { + this.elementType.initialize(conf); + } + + @Override public Collection deepCopy(Collection source) { List copiedCollection = Lists.newArrayListWithCapacity(source.size()); for (T value : source) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java index a96e7bf..f146e86 100644 --- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java @@ -19,30 +19,42 @@ package org.apache.crunch.types; import java.io.Serializable; +import org.apache.hadoop.conf.Configuration; + /** * Performs deep copies of values. * - * @param - * The type of value that will be copied + * @param The type of value that will be copied */ public interface DeepCopier extends Serializable { /** + * Initialize the deep copier with a job-specific configuration + * + * @param conf Job-specific configuration + */ + void initialize(Configuration conf); + + /** * Create a deep copy of a value. * - * @param source - * The value to be copied + * @param source The value to be copied * @return The deep copy of the value */ T deepCopy(T source); - + static class NoOpDeepCopier implements DeepCopier { @Override public V deepCopy(V source) { return source; } - + + @Override + public void initialize(Configuration conf) { + // No initialization needed + } + } - + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java index b1e4b37..55daaee 100644 --- a/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/MapDeepCopier.java @@ -20,25 +20,31 @@ package org.apache.crunch.types; import java.util.Map; import java.util.Map.Entry; -import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.Maps; -public class MapDeepCopier implements DeepCopier> { +public class MapDeepCopier implements DeepCopier> { private final PType ptype; - - public MapDeepCopier(PType ptype){ + + public MapDeepCopier(PType ptype) { this.ptype = ptype; } - + + @Override + public void initialize(Configuration conf) { + this.ptype.initialize(conf); + } + @Override public Map deepCopy(Map source) { - Map deepCopyMap = Maps.newHashMap(); - for (Entry entry : source.entrySet()){ + Map deepCopyMap = Maps.newHashMap(); + for (Entry entry : source.entrySet()) { deepCopyMap.put(entry.getKey(), ptype.getDetachedValue(entry.getValue())); } return deepCopyMap; - + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/PType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PType.java b/crunch/src/main/java/org/apache/crunch/types/PType.java index bbe8a4b..565615a 100644 --- a/crunch/src/main/java/org/apache/crunch/types/PType.java +++ b/crunch/src/main/java/org/apache/crunch/types/PType.java @@ -24,14 +24,14 @@ import org.apache.crunch.DoFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.SourceTarget; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; /** - * A {@code PType} defines a mapping between a data type that is used in a - * Crunch pipeline and a serialization and storage format that is used to - * read/write data from/to HDFS. Every {@link PCollection} has an associated - * {@code PType} that tells Crunch how to read/write data from that - * {@code PCollection}. + * A {@code PType} defines a mapping between a data type that is used in a Crunch pipeline and a + * serialization and storage format that is used to read/write data from/to HDFS. Every + * {@link PCollection} has an associated {@code PType} that tells Crunch how to read/write data from + * that {@code PCollection}. * */ public interface PType extends Serializable { @@ -52,38 +52,35 @@ public interface PType extends Serializable { Converter getConverter(); /** - * Initialize this PType for use within a DoFn. This generally only needs to - * be called when using a PType for {@link #getDetachedValue(Object)}. + * Initialize this PType for use within a DoFn. This generally only needs to be called when using + * a PType for {@link #getDetachedValue(Object)}. * + * @param conf Configuration object * @see PType#getDetachedValue(Object) */ - void initialize(); + void initialize(Configuration conf); /** - * Returns a copy of a value (or the value itself) that can safely be - * retained. + * Returns a copy of a value (or the value itself) that can safely be retained. *

- * This is useful when iterable values being processed in a DoFn (via a - * reducer) need to be held on to for more than the scope of a single - * iteration, as a reducer (and therefore also a DoFn that has an Iterable as - * input) re-use deserialized values. More information on object reuse is + * This is useful when iterable values being processed in a DoFn (via a reducer) need to be held + * on to for more than the scope of a single iteration, as a reducer (and therefore also a DoFn + * that has an Iterable as input) re-use deserialized values. More information on object reuse is * available in the {@link DoFn} class documentation. * - * @param value - * The value to be deep-copied + * @param value The value to be deep-copied * @return A deep copy of the input value */ T getDetachedValue(T value); /** - * Returns a {@code SourceTarget} that is able to read/write data using the - * serialization format specified by this {@code PType}. + * Returns a {@code SourceTarget} that is able to read/write data using the serialization format + * specified by this {@code PType}. */ SourceTarget getDefaultFileSource(Path path); /** - * Returns the sub-types that make up this PType if it is a composite - * instance, such as a tuple. + * Returns the sub-types that make up this PType if it is a composite instance, such as a tuple. */ List getSubTypes(); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java index 4f473a0..45e38c7 100644 --- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java @@ -20,27 +20,33 @@ package org.apache.crunch.types; import java.util.List; import org.apache.crunch.Tuple; +import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Lists; /** - * Performs deep copies (based on underlying PType deep copying) of Tuple-based - * objects. + * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects. * - * @param - * The type of Tuple implementation being copied + * @param The type of Tuple implementation being copied */ public class TupleDeepCopier implements DeepCopier { private final TupleFactory tupleFactory; private final List elementTypes; - public TupleDeepCopier(Class tupleClass, PType...elementTypes) { + public TupleDeepCopier(Class tupleClass, PType... elementTypes) { tupleFactory = TupleFactory.getTupleFactory(tupleClass); this.elementTypes = Lists.newArrayList(elementTypes); } @Override + public void initialize(Configuration conf) { + for (PType elementType : elementTypes) { + elementType.initialize(conf); + } + } + + @Override public T deepCopy(T source) { Object[] deepCopyValues = new Object[source.size()]; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index fe4fe1a..b431123 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -31,24 +31,23 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; import org.apache.crunch.types.DeepCopier; +import org.apache.hadoop.conf.Configuration; /** * Performs deep copies of Avro-serializable objects. *

- * Warning: Methods in this class are not thread-safe. This shouldn't be - * a problem when running in a map-reduce context where each mapper/reducer is - * running in its own JVM, but it may well be a problem in any other kind of - * multi-threaded context. + * Warning: Methods in this class are not thread-safe. This shouldn't be a problem when + * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may + * well be a problem in any other kind of multi-threaded context. */ public abstract class AvroDeepCopier implements DeepCopier, Serializable { private String jsonSchema; + private transient Configuration conf; private transient Schema schema; private BinaryEncoder binaryEncoder; private BinaryDecoder binaryDecoder; @@ -67,11 +66,16 @@ public abstract class AvroDeepCopier implements DeepCopier, Serializable { return schema; } + @Override + public void initialize(Configuration conf) { + this.conf = conf; + } + protected abstract T createCopyTarget(); - protected abstract DatumWriter createDatumWriter(); + protected abstract DatumWriter createDatumWriter(Configuration conf); - protected abstract DatumReader createDatumReader(); + protected abstract DatumReader createDatumReader(Configuration conf); /** * Deep copier for Avro specific data objects. @@ -91,12 +95,12 @@ public abstract class AvroDeepCopier implements DeepCopier, Serializable { } @Override - protected DatumWriter createDatumWriter() { + protected DatumWriter createDatumWriter(Configuration conf) { return new SpecificDatumWriter(getSchema()); } @Override - protected DatumReader createDatumReader() { + protected DatumReader createDatumReader(Configuration conf) { return new SpecificDatumReader(getSchema()); } @@ -119,12 +123,12 @@ public abstract class AvroDeepCopier implements DeepCopier, Serializable { } @Override - protected DatumReader createDatumReader() { + protected DatumReader createDatumReader(Configuration conf) { return new GenericDatumReader(getSchema()); } @Override - protected DatumWriter createDatumWriter() { + protected DatumWriter createDatumWriter(Configuration conf) { return new GenericDatumWriter(getSchema()); } } @@ -147,34 +151,29 @@ public abstract class AvroDeepCopier implements DeepCopier, Serializable { } @Override - protected DatumReader createDatumReader() { - return new ReflectDatumReader(getSchema()); + protected DatumReader createDatumReader(Configuration conf) { + return Avros.getReflectDataFactory(conf).getReader(getSchema()); } @Override - protected DatumWriter createDatumWriter() { - return new ReflectDatumWriter(getSchema()); + protected DatumWriter createDatumWriter(Configuration conf) { + return Avros.getReflectDataFactory(conf).getWriter(getSchema()); } } - public static class AvroTupleDeepCopier { - - } - /** * Create a deep copy of an Avro value. * - * @param source - * The value to be copied + * @param source The value to be copied * @return The deep copy of the value */ @Override public T deepCopy(T source) { if (datumReader == null) { - datumReader = createDatumReader(); + datumReader = createDatumReader(conf); } if (datumWriter == null) { - datumWriter = createDatumWriter(); + datumWriter = createDatumWriter(conf); } ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); @@ -182,7 +181,8 @@ public abstract class AvroDeepCopier implements DeepCopier, Serializable { try { datumWriter.write(source, binaryEncoder); binaryEncoder.flush(); - binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); + binaryDecoder = DecoderFactory.get() + .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); datumReader.read(target, binaryDecoder); } catch (Exception e) { throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java index aa5b5dc..2b4def5 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java @@ -72,8 +72,8 @@ public class AvroGroupedTableType extends PGroupedTableType { } @Override - public void initialize() { - // No initialization needed for Avro PTypes + public void initialize(Configuration conf) { + getTableType().initialize(conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java index 2582cc2..98d3f50 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java @@ -49,7 +49,7 @@ public class AvroOutputFormat extends FileOutputFormat, NullWr } ReflectDataFactory factory = Avros.getReflectDataFactory(conf); - final DataFileWriter WRITER = new DataFileWriter(factory. getWriter()); + final DataFileWriter WRITER = new DataFileWriter(factory. getWriter(schema)); JobConf jc = new JobConf(conf); /* copied from org.apache.avro.mapred.AvroOutputFormat */ http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index bd4b14c..285b423 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -66,8 +66,8 @@ public class AvroTableType extends AvroType> implements PTableT public void initialize() { keyMapFn.setContext(getContext()); valueMapFn.setContext(getContext()); - pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(new Schema.Parser().parse(firstJson), - new Schema.Parser().parse(secondJson)).toString(); + pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema( + new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString(); } @Override @@ -120,9 +120,10 @@ public class AvroTableType extends AvroType> implements PTableT private final AvroType valueType; public AvroTableType(AvroType keyType, AvroType valueType, Class> pairClass) { - super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()), - new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType, - valueType), new TupleDeepCopier(Pair.class, keyType, valueType), keyType, valueType); + super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), + valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(), + valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier( + Pair.class, keyType, valueType), keyType, valueType); this.keyType = keyType; this.valueType = valueType; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java index a127baa..a0e2722 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -31,6 +31,7 @@ import org.apache.crunch.types.Converter; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; @@ -52,13 +53,14 @@ public class AvroType implements PType { private final MapFn baseOutputMapFn; private final List subTypes; private DeepCopier deepCopier; + private boolean initialized = false; public AvroType(Class typeClass, Schema schema, DeepCopier deepCopier, PType... ptypes) { this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes); } - public AvroType(Class typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, DeepCopier deepCopier, - PType... ptypes) { + public AvroType(Class typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, + DeepCopier deepCopier, PType... ptypes) { this.typeClass = typeClass; this.schema = Preconditions.checkNotNull(schema); this.schemaString = schema.toString(); @@ -99,7 +101,7 @@ public class AvroType implements PType { if (Avros.isPrimitive(this)) { return false; } - + if (!this.subTypes.isEmpty()) { for (PType subType : this.subTypes) { AvroType atype = (AvroType) subType; @@ -109,7 +111,7 @@ public class AvroType implements PType { } return false; } - + return SpecificRecord.class.isAssignableFrom(typeClass); } @@ -164,11 +166,16 @@ public class AvroType implements PType { } @Override - public void initialize() { - // No initialization needed for Avro PTypes + public void initialize(Configuration conf) { + deepCopier.initialize(conf); + initialized = true; } + @Override public T getDetachedValue(T value) { + if (!initialized) { + throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType"); + } return deepCopier.deepCopy(value); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java index c19a168..e973cca 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/ReflectDataFactory.java @@ -35,7 +35,7 @@ public class ReflectDataFactory { return new ReflectDatumReader(schema); } - public ReflectDatumWriter getWriter() { - return new ReflectDatumWriter(); + public ReflectDatumWriter getWriter(Schema schema) { + return new ReflectDatumWriter(schema); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java index 266179e..438976c 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/SafeAvroSerialization.java @@ -111,8 +111,7 @@ public class SafeAvroSerialization extends Configured implements Serializatio .getKeySchema(AvroJob.getMapOutputSchema(conf)) : Pair.getValueSchema(AvroJob.getMapOutputSchema(conf))); ReflectDataFactory factory = Avros.getReflectDataFactory(conf); - ReflectDatumWriter writer = factory.getWriter(); - writer.setSchema(schema); + ReflectDatumWriter writer = factory.getWriter(schema); return new AvroWrapperSerializer(writer); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java index 6469208..ae4614d 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java @@ -25,22 +25,27 @@ import java.io.DataOutputStream; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; import org.apache.crunch.types.DeepCopier; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; - /** * Performs deep copies of Writable values. + * * @param The type of Writable that can be copied */ -public class WritableDeepCopier implements DeepCopier{ - +public class WritableDeepCopier implements DeepCopier { + private Class writableClass; - public WritableDeepCopier(Class writableClass){ + public WritableDeepCopier(Class writableClass) { this.writableClass = writableClass; } @Override + public void initialize(Configuration conf) { + } + + @Override public T deepCopy(T source) { ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(byteOutStream); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java index 98afb4d..32c9111 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableGroupedTableType.java @@ -23,6 +23,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.lib.PTables; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PGroupedTableType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; public class WritableGroupedTableType extends PGroupedTableType { @@ -37,7 +38,8 @@ public class WritableGroupedTableType extends PGroupedTableType { WritableType valueType = (WritableType) tableType.getValueType(); this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn()); this.outputFn = tableType.getOutputMapFn(); - this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass()); + this.converter = new WritablePairConverter(keyType.getSerializationClass(), + valueType.getSerializationClass()); } @Override @@ -61,8 +63,8 @@ public class WritableGroupedTableType extends PGroupedTableType { } @Override - public void initialize() { - this.tableType.initialize(); + public void initialize(Configuration conf) { + this.tableType.initialize(conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java index 6bb6c5d..2f75b94 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableTableType.java @@ -31,6 +31,7 @@ import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -49,7 +50,8 @@ class WritableTableType implements PTableType { this.valueType = valueType; this.inputFn = new PairMapFn(keyType.getInputMapFn(), valueType.getInputMapFn()); this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn()); - this.converter = new WritablePairConverter(keyType.getSerializationClass(), valueType.getSerializationClass()); + this.converter = new WritablePairConverter(keyType.getSerializationClass(), + valueType.getSerializationClass()); } @Override @@ -101,9 +103,9 @@ class WritableTableType implements PTableType { } @Override - public void initialize() { - keyType.initialize(); - valueType.initialize(); + public void initialize(Configuration conf) { + keyType.initialize(conf); + valueType.initialize(conf); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java index 71f81f4..93b0518 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java @@ -27,6 +27,7 @@ import org.apache.crunch.types.Converter; import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -43,8 +44,8 @@ public class WritableType implements PType { private final List subTypes; private boolean initialized = false; - WritableType(Class typeClass, Class writableClass, MapFn inputDoFn, MapFn outputDoFn, - PType... subTypes) { + WritableType(Class typeClass, Class writableClass, MapFn inputDoFn, + MapFn outputDoFn, PType... subTypes) { this.typeClass = typeClass; this.writableClass = writableClass; this.inputFn = inputDoFn; @@ -99,15 +100,16 @@ public class WritableType implements PType { return false; } WritableType wt = (WritableType) obj; - return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes)); + return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes + .equals(wt.subTypes)); } @Override - public void initialize() { + public void initialize(Configuration conf) { this.inputFn.initialize(); this.outputFn.initialize(); for (PType subType : subTypes) { - subType.initialize(); + subType.initialize(conf); } this.initialized = true; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java index 998e654..dd72364 100644 --- a/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java @@ -28,6 +28,7 @@ import org.apache.crunch.impl.mr.run.RTNode; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -48,7 +49,8 @@ public class IntermediateEmitterTest { @Test public void testEmit_SingleChild() { RTNode singleChild = mock(RTNode.class); - IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild)); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild), + new Configuration()); emitter.emit(stringWrapper); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class); @@ -60,7 +62,8 @@ public class IntermediateEmitterTest { public void testEmit_MultipleChildren() { RTNode childA = mock(RTNode.class); RTNode childB = mock(RTNode.class); - IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB)); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB), + new Configuration()); emitter.emit(stringWrapper); ArgumentCaptor argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java index 270d2c7..741899e 100644 --- a/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java +++ b/crunch/src/test/java/org/apache/crunch/lib/join/JoinFnTestBase.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.crunch.Emitter; import org.apache.crunch.Pair; import org.apache.crunch.test.StringWrapper; +import org.apache.hadoop.conf.Configuration; import org.junit.Before; import org.junit.Test; @@ -40,6 +41,7 @@ public abstract class JoinFnTestBase { @Before public void setUp() { joinFn = getJoinFn(); + joinFn.setConfigurationForTest(new Configuration()); joinFn.initialize(); emitter = mock(Emitter.class); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java index d3e7dfa..e28d094 100644 --- a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java @@ -17,12 +17,14 @@ */ package org.apache.crunch.types; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import java.util.Collection; import org.apache.crunch.test.Person; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -37,7 +39,9 @@ public class CollectionDeepCopierTest { person.siblingnames = Lists. newArrayList(); Collection personCollection = Lists.newArrayList(person); - CollectionDeepCopier collectionDeepCopier = new CollectionDeepCopier(Avros.records(Person.class)); + CollectionDeepCopier collectionDeepCopier = new CollectionDeepCopier( + Avros.records(Person.class)); + collectionDeepCopier.initialize(new Configuration()); Collection deepCopyCollection = collectionDeepCopier.deepCopy(personCollection); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java index f351691..b61a33f 100644 --- a/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/MapDeepCopierTest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Maps; @@ -37,7 +38,9 @@ public class MapDeepCopierTest { Map map = Maps.newHashMap(); map.put(key, stringWrapper); - MapDeepCopier deepCopier = new MapDeepCopier(Avros.reflects(StringWrapper.class)); + MapDeepCopier deepCopier = new MapDeepCopier( + Avros.reflects(StringWrapper.class)); + deepCopier.initialize(new Configuration()); Map deepCopy = deepCopier.deepCopy(map); assertEquals(map, deepCopy); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java index c332285..0760c7e 100644 --- a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotSame; import org.apache.crunch.Pair; import org.apache.crunch.test.Person; import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -37,9 +38,10 @@ public class TupleDeepCopierTest { person.siblingnames = Lists. newArrayList(); Pair inputPair = Pair.of(1, person); - DeepCopier deepCopier = new TupleDeepCopier( - Pair.class, Avros.ints(), Avros.records(Person.class)); + DeepCopier deepCopier = new TupleDeepCopier(Pair.class, Avros.ints(), + Avros.records(Person.class)); + deepCopier.initialize(new Configuration()); Pair deepCopyPair = deepCopier.deepCopy(inputPair); assertEquals(inputPair, deepCopyPair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index af6acb8..bb59136 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.avro.generic.GenericData.Record; import org.apache.crunch.test.Person; import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -63,17 +64,17 @@ public class AvroDeepCopierTest { String name; int age; List siblingnames; - + @Override public boolean equals(Object other) { if (other == null || !(other instanceof ReflectedPerson)) { return false; } ReflectedPerson that = (ReflectedPerson) other; - return name.equals(that.name)&& age == that.age && siblingnames.equals(that.siblingnames); + return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames); } } - + @Test public void testDeepCopyReflect() { ReflectedPerson person = new ReflectedPerson(); @@ -81,8 +82,11 @@ public class AvroDeepCopierTest { person.age = 42; person.siblingnames = Lists.newArrayList(); - ReflectedPerson deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier( - ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema()).deepCopy(person); + AvroDeepCopier avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier( + ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema()); + avroDeepCopier.initialize(new Configuration()); + + ReflectedPerson deepCopyPerson = avroDeepCopier.deepCopy(person); assertEquals(person, deepCopyPerson); assertNotSame(person, deepCopyPerson); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java index e5518f7..db9ebdc 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroGroupedTableTypeTest.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.crunch.Pair; import org.apache.crunch.test.Person; import org.apache.crunch.types.PGroupedTableType; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -43,8 +44,9 @@ public class AvroGroupedTableTypeTest { Iterable inputPersonIterable = Lists.newArrayList(person); Pair> pair = Pair.of(integerValue, inputPersonIterable); - PGroupedTableType groupedTableType = Avros.tableOf(Avros.ints(), Avros.specifics(Person.class)) - .getGroupedTableType(); + PGroupedTableType groupedTableType = Avros.tableOf(Avros.ints(), + Avros.specifics(Person.class)).getGroupedTableType(); + groupedTableType.initialize(new Configuration()); Pair> detachedPair = groupedTableType.getDetachedValue(pair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java index 5e03ff8..35d4e5b 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue; import org.apache.crunch.Pair; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -42,7 +43,9 @@ public class AvroTableTypeTest { Pair pair = Pair.of(integerValue, person); - AvroTableType tableType = Avros.tableOf(Avros.ints(), Avros.specifics(Person.class)); + AvroTableType tableType = Avros.tableOf(Avros.ints(), + Avros.specifics(Person.class)); + tableType.initialize(new Configuration()); Pair detachedPair = tableType.getDetachedValue(pair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 170bebf..383ca27 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -33,6 +33,7 @@ import org.apache.crunch.Pair; import org.apache.crunch.TupleN; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; +import org.apache.hadoop.conf.Configuration; import org.junit.Test; import com.google.common.collect.Lists; @@ -150,6 +151,7 @@ public class AvroTypeTest { @Test public void testGetDetachedValue_GenericAvroType() { AvroType genericType = Avros.generics(Person.SCHEMA$); + genericType.initialize(new Configuration()); GenericData.Record record = new GenericData.Record(Person.SCHEMA$); record.put("name", "name value"); record.put("age", 42); @@ -159,8 +161,8 @@ public class AvroTypeTest { assertEquals(record, detachedRecord); assertNotSame(record, detachedRecord); } - - private Person createPerson(){ + + private Person createPerson() { Person person = new Person(); person.name = "name value"; person.age = 42; @@ -171,30 +173,39 @@ public class AvroTypeTest { @Test public void testGetDetachedValue_SpecificAvroType() { AvroType specificType = Avros.specifics(Person.class); + specificType.initialize(new Configuration()); Person person = createPerson(); Person detachedPerson = specificType.getDetachedValue(person); assertEquals(person, detachedPerson); assertNotSame(person, detachedPerson); } + @Test(expected = IllegalStateException.class) + public void testGetDetachedValue_NotInitialized() { + AvroType specificType = Avros.specifics(Person.class); + Person person = createPerson(); + specificType.getDetachedValue(person); + } + static class ReflectedPerson { String name; int age; List siblingnames; - + @Override public boolean equals(Object other) { if (other == null || !(other instanceof ReflectedPerson)) { return false; } ReflectedPerson that = (ReflectedPerson) other; - return name.equals(that.name)&& age == that.age && siblingnames.equals(that.siblingnames); + return name.equals(that.name) && age == that.age && siblingnames.equals(that.siblingnames); } } - + @Test public void testGetDetachedValue_ReflectAvroType() { AvroType reflectType = Avros.reflects(ReflectedPerson.class); + reflectType.initialize(new Configuration()); ReflectedPerson rp = new ReflectedPerson(); rp.name = "josh"; rp.age = 32; @@ -209,6 +220,7 @@ public class AvroTypeTest { Person person = createPerson(); AvroType> pairType = Avros.pairs(Avros.ints(), Avros.records(Person.class)); + pairType.initialize(new Configuration()); Pair inputPair = Pair.of(1, person); Pair detachedPair = pairType.getDetachedValue(inputPair); @@ -216,47 +228,50 @@ public class AvroTypeTest { assertEquals(inputPair, detachedPair); assertNotSame(inputPair.second(), detachedPair.second()); } - + @Test - public void testGetDetachedValue_Collection(){ + public void testGetDetachedValue_Collection() { Person person = createPerson(); List personList = Lists.newArrayList(person); - + AvroType> collectionType = Avros.collections(Avros.records(Person.class)); - + collectionType.initialize(new Configuration()); + Collection detachedCollection = collectionType.getDetachedValue(personList); - + assertEquals(personList, detachedCollection); Person detachedPerson = detachedCollection.iterator().next(); - + assertNotSame(person, detachedPerson); } - + @Test - public void testGetDetachedValue_Map(){ + public void testGetDetachedValue_Map() { String key = "key"; Person value = createPerson(); - - Map stringPersonMap = Maps.newHashMap(); + + Map stringPersonMap = Maps.newHashMap(); stringPersonMap.put(key, value); - + AvroType> mapType = Avros.maps(Avros.records(Person.class)); - + mapType.initialize(new Configuration()); + Map detachedMap = mapType.getDetachedValue(stringPersonMap); - + assertEquals(stringPersonMap, detachedMap); assertNotSame(value, detachedMap.get(key)); } - + @Test - public void testGetDetachedValue_TupleN(){ + public void testGetDetachedValue_TupleN() { Person person = createPerson(); AvroType ptype = Avros.tuples(Avros.records(Person.class)); + ptype.initialize(new Configuration()); TupleN tuple = new TupleN(person); TupleN detachedTuple = ptype.getDetachedValue(tuple); - + assertEquals(tuple, detachedTuple); assertNotSame(person, detachedTuple.get(0)); } - + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java index a691df2..eae8218 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java @@ -17,30 +17,29 @@ */ package org.apache.crunch.types.writable; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; - public class WritableDeepCopierTest { private WritableDeepCopier deepCopier; - + @Before - public void setUp(){ + public void setUp() { deepCopier = new WritableDeepCopier(Text.class); } - + @Test - public void testDeepCopy(){ + public void testDeepCopy() { Text text = new Text("value"); Text deepCopy = deepCopier.deepCopy(text); - + assertEquals(text, deepCopy); assertNotSame(text, deepCopy); } - - + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java index 1699f3c..f6c201b 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableGroupedTableTypeTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.crunch.Pair; import org.apache.crunch.types.PGroupedTableType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -41,7 +42,7 @@ public class WritableGroupedTableTypeTest { PGroupedTableType groupedTableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class)).getGroupedTableType(); - groupedTableType.initialize(); + groupedTableType.initialize(new Configuration()); Pair> detachedPair = groupedTableType.getDetachedValue(pair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java index ae68e7a..697a28c 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTableTypeTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import org.apache.crunch.Pair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -33,8 +34,9 @@ public class WritableTableTypeTest { Text textValue = new Text("forty-two"); Pair pair = Pair.of(integerValue, textValue); - WritableTableType tableType = Writables.tableOf(Writables.ints(), Writables.writables(Text.class)); - tableType.initialize(); + WritableTableType tableType = Writables.tableOf(Writables.ints(), + Writables.writables(Text.class)); + tableType.initialize(new Configuration()); Pair detachedPair = tableType.getDetachedValue(pair); assertSame(integerValue, detachedPair.first()); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/7fef772c/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java index bea953d..65e946b 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Map; import org.apache.crunch.Pair; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -46,7 +47,7 @@ public class WritableTypeTest { @Test public void testGetDetachedValue_CustomWritable() { WritableType textWritableType = Writables.writables(Text.class); - textWritableType.initialize(); + textWritableType.initialize(new Configuration()); Text value = new Text("test"); Text detachedValue = textWritableType.getDetachedValue(value); @@ -57,9 +58,9 @@ public class WritableTypeTest { @Test public void testGetDetachedValue_Collection() { Collection textCollection = Lists.newArrayList(new Text("value")); - WritableType, GenericArrayWritable> ptype = Writables.collections(Writables - .writables(Text.class)); - ptype.initialize(); + WritableType, GenericArrayWritable> ptype = Writables + .collections(Writables.writables(Text.class)); + ptype.initialize(new Configuration()); Collection detachedCollection = ptype.getDetachedValue(textCollection); assertEquals(textCollection, detachedCollection); @@ -69,9 +70,9 @@ public class WritableTypeTest { @Test public void testGetDetachedValue_Tuple() { Pair textPair = Pair.of(new Text("one"), new Text("two")); - WritableType, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class), - Writables.writables(Text.class)); - ptype.initialize(); + WritableType, TupleWritable> ptype = Writables.pairs( + Writables.writables(Text.class), Writables.writables(Text.class)); + ptype.initialize(new Configuration()); Pair detachedPair = ptype.getDetachedValue(textPair); assertEquals(textPair, detachedPair); @@ -84,8 +85,9 @@ public class WritableTypeTest { Map stringTextMap = Maps.newHashMap(); stringTextMap.put("key", new Text("value")); - WritableType, MapWritable> ptype = Writables.maps(Writables.writables(Text.class)); - ptype.initialize(); + WritableType, MapWritable> ptype = Writables.maps(Writables + .writables(Text.class)); + ptype.initialize(new Configuration()); Map detachedMap = ptype.getDetachedValue(stringTextMap); assertEquals(stringTextMap, detachedMap);