Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95DAE10769 for ; Thu, 9 Jan 2014 15:40:49 +0000 (UTC) Received: (qmail 12807 invoked by uid 500); 9 Jan 2014 15:40:28 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 12719 invoked by uid 500); 9 Jan 2014 15:40:23 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 12625 invoked by uid 99); 9 Jan 2014 15:40:09 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jan 2014 15:40:09 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B2E3E8B369E; Thu, 9 Jan 2014 15:40:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <7ff51fde9e7b4332a29039b9dfc71522@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-239: Add a Union PType. Date: Thu, 9 Jan 2014 15:40:09 +0000 (UTC) Updated Branches: refs/heads/master 64c20ad9c -> 52da56301 CRUNCH-239: Add a Union PType. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/52da5630 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/52da5630 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/52da5630 Branch: refs/heads/master Commit: 52da56301fac9cdc0baac0ee9b8fd421a5147baa Parents: 64c20ad Author: Josh Wills Authored: Tue Jan 7 17:28:48 2014 -0800 Committer: Josh Wills Committed: Wed Jan 8 09:03:54 2014 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/Union.java | 65 +++++++++ .../java/org/apache/crunch/lib/Cogroup.java | 45 +++--- .../org/apache/crunch/types/PTypeFamily.java | 3 + .../apache/crunch/types/UnionDeepCopier.java | 49 +++++++ .../crunch/types/avro/AvroTypeFamily.java | 6 + .../org/apache/crunch/types/avro/Avros.java | 138 +++++++++++++++++++ .../crunch/types/writable/UnionWritable.java | 72 ++++++++++ .../types/writable/WritableTypeFamily.java | 6 + .../apache/crunch/types/writable/Writables.java | 103 +++++++++++++- 9 files changed, 459 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/Union.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Union.java b/crunch-core/src/main/java/org/apache/crunch/Union.java new file mode 100644 index 0000000..6db1657 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/Union.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +/** + * Allows us to represent the combination of multiple data sources that may contain different types of data + * as a single type with an index to indicate which of the original sources the current record was from. + */ +public class Union { + + private final int index; + private final Object value; + + public Union(int index, Object value) { + this.index = index; + this.value = value; + } + + /** + * Returns the index of the original data source for this union type. + */ + public int getIndex() { + return index; + } + + /** + * Returns the underlying object value of the record. + */ + public Object getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Union that = (Union) o; + + if (index != that.index) return false; + if (value != null ? !value.equals(that.value) : that.value != null) return false; + + return true; + } + + @Override + public int hashCode() { + return 31 * index + (value != null ? value.hashCode() : 0); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java index 9efcb5e..8743a29 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.TupleFactory; @@ -211,20 +212,18 @@ public class Cogroup { for (int i = 0; i < rest.length; i++) { ptypes[i + 1] = rest[i].getValueType(); } - PType itype = ptf.tuples(ptypes); + PType itype = ptf.unionOf(ptypes); - PTable firstInter = first.mapValues("coGroupTag1", - new CogroupFn(0, 1 + rest.length), - itype); - PTable[] inter = new PTable[rest.length]; + PTable firstInter = first.mapValues("coGroupTag1", + new CogroupFn(0), itype); + PTable[] inter = new PTable[rest.length]; for (int i = 0; i < rest.length; i++) { inter[i] = rest[i].mapValues("coGroupTag" + (i + 2), - new CogroupFn(i + 1, 1 + rest.length), - itype); + new CogroupFn(i + 1), itype); } - PTable union = firstInter.union(inter); - PGroupedTable grouped; + PTable union = firstInter.union(inter); + PGroupedTable grouped; if (numReducers > 0) { grouped = union.groupByKey(numReducers); } else { @@ -236,25 +235,21 @@ public class Cogroup { outputType); } - private static class CogroupFn extends MapFn { + private static class CogroupFn extends MapFn { private final int index; - private final int size; - - CogroupFn(int index, int size) { + + CogroupFn(int index) { this.index = index; - this.size = size; } @Override - public TupleN map(T input) { - Object[] v = new Object[size]; - v[index] = input; - return TupleN.of(v); + public Union map(T input) { + return new Union(index, input); } } private static class PostGroupFn extends - MapFn, T> { + MapFn, T> { private final TupleFactory factory; private final PType[] ptypes; @@ -273,18 +268,14 @@ public class Cogroup { } @Override - public T map(Iterable input) { + public T map(Iterable input) { Collection[] collections = new Collection[ptypes.length]; for (int i = 0; i < ptypes.length; i++) { collections[i] = Lists.newArrayList(); } - for (TupleN t : input) { - for (int i = 0; i < ptypes.length; i++) { - if (t.get(i) != null) { - collections[i].add(ptypes[i].getDetachedValue(t.get(i))); - break; - } - } + for (Union t : input) { + int index = t.getIndex(); + collections[index].add(ptypes[index].getDetachedValue(t.getValue())); } return (T) factory.makeTuple(collections); } http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java index 9458f14..0ad324a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/PTypeFamily.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; /** * An abstract factory for creating {@code PType} instances that have the same @@ -68,6 +69,8 @@ public interface PTypeFamily { PType derived(Class clazz, MapFn inputFn, MapFn outputFn, PType base); + PType unionOf(PType... ptypes); + PTableType tableOf(PType key, PType value); /** http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java new file mode 100644 index 0000000..ba712e0 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/UnionDeepCopier.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import com.google.common.collect.Lists; +import org.apache.crunch.Union; +import org.apache.hadoop.conf.Configuration; + +import java.util.List; + +public class UnionDeepCopier implements DeepCopier { + private final List elementTypes; + + public UnionDeepCopier(PType... elementTypes) { + this.elementTypes = Lists.newArrayList(elementTypes); + } + + @Override + public void initialize(Configuration conf) { + for (PType elementType : elementTypes) { + elementType.initialize(conf); + } + } + + @Override + public Union deepCopy(Union source) { + if (source == null) { + return null; + } + int index = source.getIndex(); + Object copy = elementTypes.get(index).getDetachedValue(source.getValue()); + return new Union(index, copy); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java index e09e173..ba8add6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java @@ -29,6 +29,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -161,4 +162,9 @@ public class AvroTypeFamily implements PTypeFamily { public PType derived(Class clazz, MapFn inputFn, MapFn outputFn, PType base) { return Avros.derived(clazz, inputFn, outputFn, base); } + + @Override + public PType unionOf(PType... ptypes) { + return Avros.unionOf(ptypes); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 2cf63e8..8f1dae0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -48,6 +48,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.CollectionDeepCopier; @@ -58,6 +59,7 @@ import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypes; import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; +import org.apache.crunch.types.UnionDeepCopier; import org.apache.crunch.types.writable.WritableDeepCopier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -649,6 +651,142 @@ public class Avros { ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes); } + private static class UnionRecordToTuple extends MapFn { + private final List fns; + + public UnionRecordToTuple(PType... ptypes) { + this.fns = Lists.newArrayList(); + for (PType ptype : ptypes) { + AvroType atype = (AvroType) ptype; + fns.add(atype.getInputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public Union map(GenericRecord input) { + int index = (Integer) input.get(0); + return new Union(index, fns.get(index).map(input.get(1))); + } + } + + private static class TupleToUnionRecord extends MapFn { + private final List fns; + private final List avroTypes; + private final String jsonSchema; + private final boolean isReflect; + private transient Schema schema; + + public TupleToUnionRecord(Schema schema, PType... ptypes) { + this.fns = Lists.newArrayList(); + this.avroTypes = Lists.newArrayList(); + this.jsonSchema = schema.toString(); + boolean reflectFound = false; + boolean specificFound = false; + for (PType ptype : ptypes) { + AvroType atype = (AvroType) ptype; + fns.add(atype.getOutputMapFn()); + avroTypes.add(atype); + if (atype.hasReflect()) { + reflectFound = true; + } + if (atype.hasSpecific()) { + specificFound = true; + } + } + if (specificFound && reflectFound) { + checkCombiningSpecificAndReflectionSchemas(); + } + this.isReflect = reflectFound; + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext context) { + for (MapFn fn : fns) { + fn.setContext(getContext()); + } + } + + @Override + public void initialize() { + this.schema = new Schema.Parser().parse(jsonSchema); + for (MapFn fn : fns) { + fn.initialize(); + } + } + + private GenericRecord createRecord() { + if (isReflect) { + return new ReflectGenericRecord(schema); + } else { + return new GenericData.Record(schema); + } + } + + @Override + public GenericRecord map(Union input) { + GenericRecord record = createRecord(); + int index = input.getIndex(); + record.put(0, index); + record.put(1, fns.get(index).map(input.getValue())); + return record; + } + } + + public static PType unionOf(PType... ptypes) { + List schemas = Lists.newArrayList(); + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + for (int i = 0; i < ptypes.length; i++) { + AvroType atype = (AvroType) ptypes[i]; + Schema schema = atype.getSchema(); + if (!schemas.contains(schema)) { + schemas.add(schema); + md.update(schema.toString().getBytes(Charsets.UTF_8)); + } + } + List fields = Lists.newArrayList( + new Schema.Field("index", Schema.create(Type.INT), "", null), + new Schema.Field("value", Schema.createUnion(schemas), "", null)); + + String schemaName = "union" + Base64.encodeBase64URLSafeString(md.digest()).replace('-', 'x'); + Schema schema = Schema.createRecord(schemaName, "", "crunch", false); + schema.setFields(fields); + return new AvroType(Union.class, schema, new UnionRecordToTuple(ptypes), + new TupleToUnionRecord(schema, ptypes), new UnionDeepCopier(ptypes), null, ptypes); + } + private static Schema createTupleSchema(PType... ptypes) throws RuntimeException { // Guarantee each tuple schema has a globally unique name List fields = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java new file mode 100644 index 0000000..b88632a --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/UnionWritable.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class UnionWritable implements WritableComparable { + + private int index; + private BytesWritable value; + + public UnionWritable() { + // no-arg constructor for writables + } + + public UnionWritable(int index, BytesWritable value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return index; + } + + public BytesWritable getValue() { + return value; + } + + @Override + public int compareTo(UnionWritable other) { + if (index == other.getIndex()) { + return value.compareTo(other.getValue()); + } + return index - other.getIndex(); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, index); + value.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.index = WritableUtils.readVInt(in); + if (value == null) { + value = new BytesWritable(); + } + value.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java index a94db96..5754b4d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/WritableTypeFamily.java @@ -27,6 +27,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; @@ -144,4 +145,9 @@ public class WritableTypeFamily implements PTypeFamily { public PType derived(Class clazz, MapFn inputFn, MapFn outputFn, PType base) { return Writables.derived(clazz, inputFn, outputFn, base); } + + @Override + public PType unionOf(PType... ptypes) { + return Writables.unionOf(ptypes); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/52da5630/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java index 0273e5e..d8ad6ca 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -32,6 +32,7 @@ import org.apache.crunch.Tuple; import org.apache.crunch.Tuple3; import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; +import org.apache.crunch.Union; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.PType; @@ -288,7 +289,7 @@ public class Writables { } return instance; } - + /** * For mapping from {@link TupleWritable} instances to {@link Tuple}s. * @@ -455,6 +456,106 @@ public class Writables { return new WritableType(clazz, TupleWritable.class, input, output, ptypes); } + /** + * For mapping from {@link TupleWritable} instances to {@link Tuple}s. + * + */ + private static class UWInputFn extends MapFn { + private final List fns; + private final List> writableClasses; + + public UWInputFn(WritableType... ptypes) { + this.fns = Lists.newArrayList(); + this.writableClasses = Lists.newArrayList(); + for (WritableType ptype : ptypes) { + fns.add(ptype.getInputMapFn()); + writableClasses.add(ptype.getSerializationClass()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public Union map(UnionWritable in) { + int index = in.getIndex(); + Writable w = create(writableClasses.get(index), in.getValue()); + return new Union(index, fns.get(index).map(w)); + } + } + + /** + * For mapping from {@code Tuple}s to {@code TupleWritable}s. + * + */ + private static class UWOutputFn extends MapFn { + + private final List fns; + + public UWOutputFn(PType... ptypes) { + this.fns = Lists.newArrayList(); + for (PType ptype : ptypes) { + fns.add(ptype.getOutputMapFn()); + } + } + + @Override + public void configure(Configuration conf) { + for (MapFn fn : fns) { + fn.configure(conf); + } + } + + @Override + public void setContext(TaskInputOutputContext context) { + for (MapFn fn : fns) { + fn.setContext(context); + } + } + + @Override + public void initialize() { + for (MapFn fn : fns) { + fn.initialize(); + } + } + + @Override + public UnionWritable map(Union input) { + int index = input.getIndex(); + Writable w = (Writable) fns.get(index).map(input.getValue()); + return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w))); + } + } + + public static PType unionOf(PType... ptypes) { + WritableType[] wt = new WritableType[ptypes.length]; + for (int i = 0; i < wt.length; i++) { + wt[i] = (WritableType) ptypes[i]; + } + UWInputFn input= new UWInputFn(wt); + UWOutputFn output = new UWOutputFn(ptypes); + return new WritableType(Union.class, UnionWritable.class, input, output, ptypes); + } + public static PType derived(Class clazz, MapFn inputFn, MapFn outputFn, PType base) { WritableType wt = (WritableType) base; MapFn input = new CompositeMapFn(wt.getInputMapFn(), inputFn);