Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7D11108FF for ; Tue, 23 Apr 2013 20:41:05 +0000 (UTC) Received: (qmail 43750 invoked by uid 500); 23 Apr 2013 20:41:05 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 43615 invoked by uid 500); 23 Apr 2013 20:41:05 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 43197 invoked by uid 99); 23 Apr 2013 20:41:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Apr 2013 20:41:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 29A838231A5; Tue, 23 Apr 2013 20:41:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Date: Tue, 23 Apr 2013 20:41:08 -0000 Message-Id: <5dfa3fea766b4c7d87bc1b439eab437a@git.apache.org> In-Reply-To: <5c5792aee18145bb82d063830978ad2a@git.apache.org> References: <5c5792aee18145bb82d063830978ad2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java b/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java deleted file mode 100644 index e61b98b..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/PTypeUtils.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types; - -import java.util.Collection; -import java.util.List; - -import org.apache.crunch.Pair; -import org.apache.crunch.Tuple; -import org.apache.crunch.Tuple3; -import org.apache.crunch.Tuple4; -import org.apache.crunch.TupleN; - -/** - * Utilities for converting between {@code PType}s from different - * {@code PTypeFamily} implementations. - * - */ -public class PTypeUtils { - - public static PType convert(PType ptype, PTypeFamily tf) { - if (ptype instanceof PTableType) { - PTableType ptt = (PTableType) ptype; - return tf.tableOf(tf.as(ptt.getKeyType()), tf.as(ptt.getValueType())); - } - Class typeClass = ptype.getTypeClass(); - if (Tuple.class.isAssignableFrom(typeClass)) { - List subTypes = ptype.getSubTypes(); - if (Pair.class.equals(typeClass)) { - return tf.pairs(tf.as(subTypes.get(0)), tf.as(subTypes.get(1))); - } else if (Tuple3.class.equals(typeClass)) { - return tf.triples(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2))); - } else if (Tuple4.class.equals(typeClass)) { - return tf.quads(tf.as(subTypes.get(0)), tf.as(subTypes.get(1)), tf.as(subTypes.get(2)), tf.as(subTypes.get(3))); - } else if (TupleN.class.equals(typeClass)) { - PType[] newPTypes = subTypes.toArray(new PType[0]); - for (int i = 0; i < newPTypes.length; i++) { - newPTypes[i] = tf.as(subTypes.get(i)); - } - return (PType) tf.tuples(newPTypes); - } - } - if (Collection.class.isAssignableFrom(typeClass)) { - return tf.collections(tf.as(ptype.getSubTypes().get(0))); - } - return tf.records(typeClass); - } - - private PTypeUtils() { - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/PTypes.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/PTypes.java b/crunch/src/main/java/org/apache/crunch/types/PTypes.java deleted file mode 100644 index 546719c..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/PTypes.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types; - -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.util.UUID; - -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.MapFn; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; - -/** - * Utility functions for creating common types of derived PTypes, e.g., for JSON - * data, protocol buffers, and Thrift records. - * - */ -public class PTypes { - - public static PType bigInt(PTypeFamily typeFamily) { - return typeFamily.derived(BigInteger.class, BYTE_TO_BIGINT, BIGINT_TO_BYTE, typeFamily.bytes()); - } - - public static PType uuid(PTypeFamily ptf) { - return ptf.derived(UUID.class, BYTE_TO_UUID, UUID_TO_BYTE, ptf.bytes()); - } - - public static PType jsonString(Class clazz, PTypeFamily typeFamily) { - return typeFamily - .derived(clazz, new JacksonInputMapFn(clazz), new JacksonOutputMapFn(), typeFamily.strings()); - } - - public static PType protos(Class clazz, PTypeFamily typeFamily) { - return typeFamily.derived(clazz, new ProtoInputMapFn(clazz), new ProtoOutputMapFn(), typeFamily.bytes()); - } - - public static PType thrifts(Class clazz, PTypeFamily typeFamily) { - return typeFamily.derived(clazz, new ThriftInputMapFn(clazz), new ThriftOutputMapFn(), typeFamily.bytes()); - } - - public static final PType enums(final Class type, PTypeFamily typeFamily) { - return typeFamily.derived(type, new EnumInputMapper(type), new EnumOutputMapper(), typeFamily.strings()); - } - - public static MapFn BYTE_TO_BIGINT = new MapFn() { - public BigInteger map(ByteBuffer input) { - return input == null ? null : new BigInteger(input.array()); - } - }; - - public static MapFn BIGINT_TO_BYTE = new MapFn() { - public ByteBuffer map(BigInteger input) { - return input == null ? null : ByteBuffer.wrap(input.toByteArray()); - } - }; - - private static class JacksonInputMapFn extends MapFn { - - private final Class clazz; - private transient ObjectMapper mapper; - - public JacksonInputMapFn(Class clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.mapper = new ObjectMapper(); - } - - @Override - public T map(String input) { - try { - return mapper.readValue(input, clazz); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - private static class JacksonOutputMapFn extends MapFn { - - private transient ObjectMapper mapper; - - @Override - public void initialize() { - this.mapper = new ObjectMapper(); - } - - @Override - public String map(T input) { - try { - return mapper.writeValueAsString(input); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - private static class ProtoInputMapFn extends MapFn { - - private final Class clazz; - private transient T instance; - - public ProtoInputMapFn(Class clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.instance = Protos.getDefaultInstance(clazz); - } - - @Override - public T map(ByteBuffer bb) { - try { - return (T) instance.newBuilderForType().mergeFrom(bb.array(), bb.position(), bb.limit()).build(); - } catch (InvalidProtocolBufferException e) { - throw new CrunchRuntimeException(e); - } - } - } - - private static class ProtoOutputMapFn extends MapFn { - - public ProtoOutputMapFn() { - } - - @Override - public ByteBuffer map(T proto) { - return ByteBuffer.wrap(proto.toByteArray()); - } - } - - private static class ThriftInputMapFn extends MapFn { - - private final Class clazz; - private transient T instance; - private transient TDeserializer deserializer; - private transient byte[] bytes; - - public ThriftInputMapFn(Class clazz) { - this.clazz = clazz; - } - - @Override - public void initialize() { - this.instance = ReflectionUtils.newInstance(clazz, null); - this.deserializer = new TDeserializer(new TBinaryProtocol.Factory()); - this.bytes = new byte[0]; - } - - @Override - public T map(ByteBuffer bb) { - T next = (T) instance.deepCopy(); - int len = bb.limit() - bb.position(); - if (len != bytes.length) { - bytes = new byte[len]; - } - System.arraycopy(bb.array(), bb.position(), bytes, 0, len); - try { - deserializer.deserialize(next, bytes); - } catch (TException e) { - throw new CrunchRuntimeException(e); - } - return next; - } - } - - private static class ThriftOutputMapFn extends MapFn { - - private transient TSerializer serializer; - - public ThriftOutputMapFn() { - } - - @Override - public void initialize() { - this.serializer = new TSerializer(new TBinaryProtocol.Factory()); - } - - @Override - public ByteBuffer map(T t) { - try { - return ByteBuffer.wrap(serializer.serialize(t)); - } catch (TException e) { - throw new CrunchRuntimeException(e); - } - } - } - - private static class EnumInputMapper extends MapFn { - private final Class type; - - public EnumInputMapper(Class type) { - this.type = type; - } - - @Override - public T map(String input) { - return (T) Enum.valueOf(type, input); - } - }; - - private static class EnumOutputMapper extends MapFn { - - @Override - public String map(T input) { - return input.name(); - } - }; - - private static MapFn BYTE_TO_UUID = new MapFn() { - @Override - public UUID map(ByteBuffer input) { - return new UUID(input.getLong(), input.getLong()); - } - }; - - private static MapFn UUID_TO_BYTE = new MapFn() { - @Override - public ByteBuffer map(UUID input) { - ByteBuffer bb = ByteBuffer.wrap(new byte[16]); - bb.asLongBuffer().put(input.getMostSignificantBits()).put(input.getLeastSignificantBits()); - return bb; - } - }; -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/Protos.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/Protos.java b/crunch/src/main/java/org/apache/crunch/types/Protos.java deleted file mode 100644 index 4cd5068..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/Protos.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types; - -import java.util.Iterator; -import java.util.List; - -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.DoFn; -import org.apache.crunch.Emitter; -import org.apache.crunch.MapFn; -import org.apache.hadoop.util.ReflectionUtils; - -import com.google.common.base.Splitter; -import com.google.protobuf.Descriptors.FieldDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; - -/** - * Utility functions for working with protocol buffers in Crunch. - */ -public class Protos { - - /** - * Utility function for creating a default PB Messgae from a Class object that - * works with both protoc 2.3.0 and 2.4.x. - * @param clazz The class of the protocol buffer to create - * @return An instance of a protocol buffer - */ - public static M getDefaultInstance(Class clazz) { - if (clazz.getConstructors().length > 0) { - // Protobuf 2.3.0 - return ReflectionUtils.newInstance(clazz, null); - } else { - // Protobuf 2.4.x - try { - Message.Builder mb = (Message.Builder) clazz.getDeclaredMethod("newBuilder").invoke(null); - return (M) mb.getDefaultInstanceForType(); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - - public static MapFn extractKey(String fieldName) { - return new ExtractKeyFn(fieldName); - } - - public static DoFn lineParser(String sep, Class msgClass) { - return new TextToProtoFn(sep, msgClass); - } - - private static class ExtractKeyFn extends MapFn { - - private final String fieldName; - - private transient FieldDescriptor fd; - - public ExtractKeyFn(String fieldName) { - this.fieldName = fieldName; - } - - @Override - public K map(M input) { - if (input == null) { - throw new IllegalArgumentException("Null inputs not supported by Protos.ExtractKeyFn"); - } else if (fd == null) { - fd = input.getDescriptorForType().findFieldByName(fieldName); - if (fd == null) { - throw new IllegalStateException("Could not find field: " + fieldName + " in message: " + input); - } - } - return (K) input.getField(fd); - } - - } - - private static class TextToProtoFn extends DoFn { - - private final String sep; - private final Class msgClass; - - private transient M msgInstance; - private transient List fields; - private transient Splitter splitter; - - enum ParseErrors { - TOTAL, - NUMBER_FORMAT - }; - - public TextToProtoFn(String sep, Class msgClass) { - this.sep = sep; - this.msgClass = msgClass; - } - - @Override - public void initialize() { - this.msgInstance = getDefaultInstance(msgClass); - this.fields = msgInstance.getDescriptorForType().getFields(); - this.splitter = Splitter.on(sep); - } - - @Override - public void process(String input, Emitter emitter) { - if (input != null && !input.isEmpty()) { - Builder b = msgInstance.newBuilderForType(); - Iterator iter = splitter.split(input).iterator(); - boolean parseError = false; - for (FieldDescriptor fd : fields) { - if (iter.hasNext()) { - String value = iter.next(); - if (value != null && !value.isEmpty()) { - Object parsedValue = null; - try { - switch (fd.getJavaType()) { - case STRING: - parsedValue = value; - break; - case INT: - parsedValue = Integer.valueOf(value); - break; - case LONG: - parsedValue = Long.valueOf(value); - break; - case FLOAT: - parsedValue = Float.valueOf(value); - break; - case DOUBLE: - parsedValue = Double.valueOf(value); - break; - case BOOLEAN: - parsedValue = Boolean.valueOf(value); - break; - case ENUM: - parsedValue = fd.getEnumType().findValueByName(value); - break; - } - b.setField(fd, parsedValue); - } catch (NumberFormatException nfe) { - increment(ParseErrors.NUMBER_FORMAT); - parseError = true; - break; - } - } - } - } - - if (parseError) { - increment(ParseErrors.TOTAL); - } else { - emitter.emit((M) b.build()); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java deleted file mode 100644 index a2ffae3..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types; - -import java.util.List; - -import org.apache.crunch.Tuple; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Lists; - -/** - * Performs deep copies (based on underlying PType deep copying) of Tuple-based objects. - * - * @param The type of Tuple implementation being copied - */ -public class TupleDeepCopier implements DeepCopier { - - private final TupleFactory tupleFactory; - private final List elementTypes; - - public TupleDeepCopier(Class tupleClass, PType... elementTypes) { - tupleFactory = TupleFactory.getTupleFactory(tupleClass); - this.elementTypes = Lists.newArrayList(elementTypes); - } - - @Override - public void initialize(Configuration conf) { - for (PType elementType : elementTypes) { - elementType.initialize(conf); - } - } - - @Override - public T deepCopy(T source) { - - if (source == null) { - return null; - } - - Object[] deepCopyValues = new Object[source.size()]; - - for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) { - PType elementType = elementTypes.get(valueIndex); - deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex)); - } - - return tupleFactory.makeTuple(deepCopyValues); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java deleted file mode 100644 index 73b47de..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types; - -import java.io.Serializable; -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.Pair; -import org.apache.crunch.Tuple; -import org.apache.crunch.Tuple3; -import org.apache.crunch.Tuple4; -import org.apache.crunch.TupleN; - -import com.google.common.collect.Maps; - -public abstract class TupleFactory implements Serializable { - - public void initialize() { - } - - public abstract T makeTuple(Object... values); - - - private static final Map customTupleFactories = Maps.newHashMap(); - - /** - * Get the {@link TupleFactory} for a given Tuple implementation. - * - * @param tupleClass - * The class for which the factory is to be retrieved - * @return The appropriate TupleFactory - */ - public static TupleFactory getTupleFactory(Class tupleClass) { - if (tupleClass == Pair.class) { - return (TupleFactory) PAIR; - } else if (tupleClass == Tuple3.class) { - return (TupleFactory) TUPLE3; - } else if (tupleClass == Tuple4.class) { - return (TupleFactory) TUPLE4; - } else if (tupleClass == TupleN.class) { - return (TupleFactory) TUPLEN; - } else if (customTupleFactories.containsKey(tupleClass)) { - return (TupleFactory) customTupleFactories.get(tupleClass); - } else { - throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass); - } - } - - public static final TupleFactory PAIR = new TupleFactory() { - @Override - public Pair makeTuple(Object... values) { - return Pair.of(values[0], values[1]); - } - }; - - public static final TupleFactory TUPLE3 = new TupleFactory() { - @Override - public Tuple3 makeTuple(Object... values) { - return Tuple3.of(values[0], values[1], values[2]); - } - }; - - public static final TupleFactory TUPLE4 = new TupleFactory() { - @Override - public Tuple4 makeTuple(Object... values) { - return Tuple4.of(values[0], values[1], values[2], values[3]); - } - }; - - public static final TupleFactory TUPLEN = new TupleFactory() { - @Override - public TupleN makeTuple(Object... values) { - return new TupleN(values); - } - }; - - public static TupleFactory create(Class clazz, Class... typeArgs) { - if (customTupleFactories.containsKey(clazz)) { - return (TupleFactory) customTupleFactories.get(clazz); - } - TupleFactory custom = new CustomTupleFactory(clazz, typeArgs); - customTupleFactories.put(clazz, custom); - return custom; - } - - private static class CustomTupleFactory extends TupleFactory { - - private final Class clazz; - private final Class[] typeArgs; - - private transient Constructor constructor; - - public CustomTupleFactory(Class clazz, Class[] typeArgs) { - this.clazz = clazz; - this.typeArgs = typeArgs; - } - - @Override - public void initialize() { - try { - constructor = clazz.getConstructor(typeArgs); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - - @Override - public T makeTuple(Object... values) { - try { - return constructor.newInstance(values); - } catch (Exception e) { - throw new CrunchRuntimeException(e); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java deleted file mode 100644 index cc1636c..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroCapabilities.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; - -import com.google.common.collect.Lists; - -/** - * Determines the capabilities of the Avro version that is currently being used. - */ -class AvroCapabilities { - - public static class Record extends org.apache.avro.specific.SpecificRecordBase implements - org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser() - .parse("{\"type\":\"record\",\"name\":\"Record\",\"namespace\":\"org.apache.crunch.types.avro\",\"fields\":[{\"name\":\"subrecords\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"); - @Deprecated - public java.util.List subrecords; - - public java.lang.Object get(int field$) { - switch (field$) { - case 0: - return subrecords; - default: - throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value = "unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: - subrecords = (java.util.List) value$; - break; - default: - throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - @Override - public Schema getSchema() { - return SCHEMA$; - } - } - - /** - * Determine if the current Avro version can use the ReflectDatumReader to - * read SpecificData that includes an array. The inability to do this was a - * bug that was fixed in Avro 1.7.0. - * - * @return true if SpecificData can be properly read using a - * ReflectDatumReader - */ - static boolean canDecodeSpecificSchemaWithReflectDatumReader() { - ReflectDatumReader datumReader = new ReflectDatumReader(Record.SCHEMA$); - ReflectDatumWriter datumWriter = new ReflectDatumWriter(Record.SCHEMA$); - - Record record = new Record(); - record.subrecords = Lists. newArrayList("a", "b"); - - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); - - try { - datumWriter.write(record, encoder); - encoder.flush(); - BinaryDecoder decoder = DecoderFactory.get().binaryDecoder( - byteArrayOutputStream.toByteArray(), null); - datumReader.read(record, decoder); - } catch (IOException ioe) { - throw new RuntimeException("Error performing specific schema test", ioe); - } catch (ClassCastException cce) { - // This indicates that we're using a pre-1.7.0 version of Avro, as the - // ReflectDatumReader in those versions could not correctly handle an - // array in a SpecificData value - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java deleted file mode 100644 index 0fe9288..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.ByteArrayOutputStream; -import java.io.Serializable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.crunch.CrunchRuntimeException; -import org.apache.crunch.types.DeepCopier; -import org.apache.hadoop.conf.Configuration; - -/** - * Performs deep copies of Avro-serializable objects. - *

- * Warning: Methods in this class are not thread-safe. This shouldn't be a problem when - * running in a map-reduce context where each mapper/reducer is running in its own JVM, but it may - * well be a problem in any other kind of multi-threaded context. - */ -abstract class AvroDeepCopier implements DeepCopier, Serializable { - - private String jsonSchema; - private transient Configuration conf; - private transient Schema schema; - private BinaryEncoder binaryEncoder; - private BinaryDecoder binaryDecoder; - - private transient DatumWriter datumWriter; - private transient DatumReader datumReader; - - public AvroDeepCopier(Schema schema) { - this.jsonSchema = schema.toString(); - } - - protected Schema getSchema() { - if (schema == null) { - schema = new Schema.Parser().parse(jsonSchema); - } - return schema; - } - - @Override - public void initialize(Configuration conf) { - this.conf = conf; - } - - protected abstract T createCopyTarget(); - - protected abstract DatumWriter createDatumWriter(Configuration conf); - - protected abstract DatumReader createDatumReader(Configuration conf); - - /** - * Deep copier for Avro specific data objects. - */ - public static class AvroSpecificDeepCopier extends AvroDeepCopier { - - private Class valueClass; - - public AvroSpecificDeepCopier(Class valueClass, Schema schema) { - super(schema); - this.valueClass = valueClass; - } - - @Override - protected T createCopyTarget() { - return createNewInstance(valueClass); - } - - @Override - protected DatumWriter createDatumWriter(Configuration conf) { - return new SpecificDatumWriter(getSchema()); - } - - @Override - protected DatumReader createDatumReader(Configuration conf) { - return new SpecificDatumReader(getSchema()); - } - - } - - /** - * Deep copier for Avro generic data objects. - */ - public static class AvroGenericDeepCopier extends AvroDeepCopier { - - private transient Schema schema; - - public AvroGenericDeepCopier(Schema schema) { - super(schema); - } - - @Override - protected Record createCopyTarget() { - return new GenericData.Record(getSchema()); - } - - @Override - protected DatumReader createDatumReader(Configuration conf) { - return new GenericDatumReader(getSchema()); - } - - @Override - protected DatumWriter createDatumWriter(Configuration conf) { - return new GenericDatumWriter(getSchema()); - } - } - - /** - * Deep copier for Avro reflect data objects. - */ - public static class AvroReflectDeepCopier extends AvroDeepCopier { - - private Class valueClass; - - public AvroReflectDeepCopier(Class valueClass, Schema schema) { - super(schema); - this.valueClass = valueClass; - } - - @Override - protected T createCopyTarget() { - return createNewInstance(valueClass); - } - - @Override - protected DatumReader createDatumReader(Configuration conf) { - return Avros.getReflectDataFactory(conf).getReader(getSchema()); - } - - @Override - protected DatumWriter createDatumWriter(Configuration conf) { - return Avros.getReflectDataFactory(conf).getWriter(getSchema()); - } - } - - /** - * Create a deep copy of an Avro value. - * - * @param source The value to be copied - * @return The deep copy of the value - */ - @Override - public T deepCopy(T source) { - - if (source == null) { - return null; - } - - if (datumReader == null) { - datumReader = createDatumReader(conf); - } - if (datumWriter == null) { - datumWriter = createDatumWriter(conf); - } - ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); - binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); - T target = createCopyTarget(); - try { - datumWriter.write(source, binaryEncoder); - binaryEncoder.flush(); - binaryDecoder = DecoderFactory.get() - .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); - datumReader.read(target, binaryDecoder); - } catch (Exception e) { - throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); - } - - return target; - } - - protected T createNewInstance(Class targetClass) { - try { - return targetClass.newInstance(); - } catch (InstantiationException e) { - throw new CrunchRuntimeException(e); - } catch (IllegalAccessException e) { - throw new CrunchRuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java deleted file mode 100644 index 598868f..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroGroupedTableType.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.util.Collection; - -import org.apache.avro.mapred.AvroJob; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroKeyComparator; -import org.apache.avro.mapred.AvroValue; -import org.apache.crunch.GroupingOptions; -import org.apache.crunch.MapFn; -import org.apache.crunch.Pair; -import org.apache.crunch.fn.PairMapFn; -import org.apache.crunch.lib.PTables; -import org.apache.crunch.types.Converter; -import org.apache.crunch.types.PGroupedTableType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; - -/** - * - * - */ -class AvroGroupedTableType extends PGroupedTableType { - - private static final AvroPairConverter CONVERTER = new AvroPairConverter(); - private final MapFn inputFn; - private final MapFn outputFn; - - public AvroGroupedTableType(AvroTableType tableType) { - super(tableType); - AvroType keyType = (AvroType) tableType.getKeyType(); - AvroType valueType = (AvroType) tableType.getValueType(); - this.inputFn = new PairIterableMapFn(keyType.getInputMapFn(), valueType.getInputMapFn()); - this.outputFn = new PairMapFn(keyType.getOutputMapFn(), valueType.getOutputMapFn()); - } - - @Override - public Class>> getTypeClass() { - return (Class>>) Pair.of(null, null).getClass(); - } - - @Override - public Converter getGroupingConverter() { - return CONVERTER; - } - - @Override - public MapFn getInputMapFn() { - return inputFn; - } - - @Override - public MapFn getOutputMapFn() { - return outputFn; - } - - @Override - public void initialize(Configuration conf) { - getTableType().initialize(conf); - } - - @Override - public Pair> getDetachedValue(Pair> value) { - return PTables.getGroupedDetachedValue(this, value); - } - - @Override - public void configureShuffle(Job job, GroupingOptions options) { - AvroTableType att = (AvroTableType) tableType; - String schemaJson = att.getSchema().toString(); - Configuration conf = job.getConfiguration(); - - if (att.hasReflect()) { - if (att.hasSpecific()) { - Avros.checkCombiningSpecificAndReflectionSchemas(); - } - conf.setBoolean(AvroJob.MAP_OUTPUT_IS_REFLECT, true); - } - conf.set(AvroJob.MAP_OUTPUT_SCHEMA, schemaJson); - job.setSortComparatorClass(AvroKeyComparator.class); - job.setMapOutputKeyClass(AvroKey.class); - job.setMapOutputValueClass(AvroValue.class); - if (options != null) { - options.configure(job); - } - - Avros.configureReflectDataFactory(conf); - - Collection serializations = job.getConfiguration().getStringCollection( - "io.serializations"); - if (!serializations.contains(SafeAvroSerialization.class.getName())) { - serializations.add(SafeAvroSerialization.class.getName()); - job.getConfiguration().setStrings("io.serializations", serializations.toArray(new String[0])); - } - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java deleted file mode 100644 index b8bbebd..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroInputFormat.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.mapred.AvroJob; -import org.apache.avro.mapred.AvroWrapper; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** An {@link org.apache.hadoop.mapreduce.InputFormat} for Avro data files. */ -public class AvroInputFormat extends FileInputFormat, NullWritable> { - @Override - public RecordReader, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - context.setStatus(split.toString()); - String jsonSchema = context.getConfiguration().get(AvroJob.INPUT_SCHEMA); - Schema schema = new Schema.Parser().parse(jsonSchema); - return new AvroRecordReader(schema); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java deleted file mode 100644 index 68b717d..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import org.apache.avro.mapred.AvroWrapper; -import org.apache.crunch.types.Converter; -import org.apache.hadoop.io.NullWritable; - -class AvroKeyConverter implements Converter, NullWritable, K, Iterable> { - - private transient AvroWrapper wrapper = null; - - @Override - public K convertInput(AvroWrapper key, NullWritable value) { - return key.datum(); - } - - @Override - public AvroWrapper outputKey(K value) { - getWrapper().datum(value); - return wrapper; - } - - @Override - public NullWritable outputValue(K value) { - return NullWritable.get(); - } - - @Override - public Class> getKeyClass() { - return (Class>) getWrapper().getClass(); - } - - @Override - public Class getValueClass() { - return NullWritable.class; - } - - private AvroWrapper getWrapper() { - if (wrapper == null) { - wrapper = new AvroWrapper(); - } - return wrapper; - } - - @Override - public Iterable convertIterableInput(AvroWrapper key, Iterable value) { - throw new UnsupportedOperationException("Should not be possible"); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java deleted file mode 100644 index 98d3f50..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroOutputFormat.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.mapred.AvroJob; -import org.apache.avro.mapred.AvroWrapper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -/** An {@link org.apache.hadoop.mapreduce.OutputFormat} for Avro data files. */ -public class AvroOutputFormat extends FileOutputFormat, NullWritable> { - - @Override - public RecordWriter, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, - InterruptedException { - - Configuration conf = context.getConfiguration(); - Schema schema = null; - String outputName = conf.get("crunch.namedoutput"); - if (outputName != null && !outputName.isEmpty()) { - schema = (new Schema.Parser()).parse(conf.get("avro.output.schema." + outputName)); - } else { - schema = AvroJob.getOutputSchema(context.getConfiguration()); - } - - ReflectDataFactory factory = Avros.getReflectDataFactory(conf); - final DataFileWriter WRITER = new DataFileWriter(factory. getWriter(schema)); - - JobConf jc = new JobConf(conf); - /* copied from org.apache.avro.mapred.AvroOutputFormat */ - - if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jc)) { - int level = conf.getInt(org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY, - org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL); - String codecName = conf.get(AvroJob.OUTPUT_CODEC, - org.apache.avro.file.DataFileConstants.DEFLATE_CODEC); - CodecFactory codec = codecName.equals(org.apache.avro.file.DataFileConstants.DEFLATE_CODEC) - ? CodecFactory.deflateCodec(level) - : CodecFactory.fromString(codecName); - WRITER.setCodec(codec); - } - - WRITER.setSyncInterval(jc.getInt(org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY, - org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL)); - - Path path = getDefaultWorkFile(context, org.apache.avro.mapred.AvroOutputFormat.EXT); - WRITER.create(schema, path.getFileSystem(context.getConfiguration()).create(path)); - - return new RecordWriter, NullWritable>() { - @Override - public void write(AvroWrapper wrapper, NullWritable ignore) throws IOException { - WRITER.append(wrapper.datum()); - } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - WRITER.close(); - } - }; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java deleted file mode 100644 index d1d2627..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroPairConverter.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.util.Iterator; - -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; -import org.apache.crunch.Pair; -import org.apache.crunch.types.Converter; - -class AvroPairConverter implements Converter, AvroValue, Pair, Pair>> { - - private transient AvroKey keyWrapper = null; - private transient AvroValue valueWrapper = null; - - @Override - public Pair convertInput(AvroKey key, AvroValue value) { - return Pair.of(key.datum(), value.datum()); - } - - public Pair> convertIterableInput(AvroKey key, Iterable> iter) { - Iterable it = new AvroWrappedIterable(iter); - return Pair.of(key.datum(), it); - } - - @Override - public AvroKey outputKey(Pair value) { - getKeyWrapper().datum(value.first()); - return keyWrapper; - } - - @Override - public AvroValue outputValue(Pair value) { - getValueWrapper().datum(value.second()); - return valueWrapper; - } - - @Override - public Class> getKeyClass() { - return (Class>) getKeyWrapper().getClass(); - } - - @Override - public Class> getValueClass() { - return (Class>) getValueWrapper().getClass(); - } - - private AvroKey getKeyWrapper() { - if (keyWrapper == null) { - keyWrapper = new AvroKey(); - } - return keyWrapper; - } - - private AvroValue getValueWrapper() { - if (valueWrapper == null) { - valueWrapper = new AvroValue(); - } - return valueWrapper; - } - - private static class AvroWrappedIterable implements Iterable { - - private final Iterable> iters; - - public AvroWrappedIterable(Iterable> iters) { - this.iters = iters; - } - - @Override - public Iterator iterator() { - return new Iterator() { - private final Iterator> it = iters.iterator(); - - @Override - public boolean hasNext() { - return it.hasNext(); - } - - @Override - public V next() { - return it.next().datum(); - } - - @Override - public void remove() { - it.remove(); - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java deleted file mode 100644 index 9c7578c..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.FileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.AvroJob; -import org.apache.avro.mapred.AvroWrapper; -import org.apache.avro.mapred.FsInput; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -/** An {@link RecordReader} for Avro data files. */ -class AvroRecordReader extends RecordReader, NullWritable> { - - private FileReader reader; - private long start; - private long end; - private AvroWrapper key; - private NullWritable value; - private Schema schema; - - public AvroRecordReader(Schema schema) { - this.schema = schema; - } - - @Override - public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { - FileSplit split = (FileSplit) genericSplit; - Configuration conf = context.getConfiguration(); - SeekableInput in = new FsInput(split.getPath(), conf); - DatumReader datumReader = null; - if (context.getConfiguration().getBoolean(AvroJob.INPUT_IS_REFLECT, true)) { - ReflectDataFactory factory = Avros.getReflectDataFactory(conf); - datumReader = factory.getReader(schema); - } else { - datumReader = new SpecificDatumReader(schema); - } - this.reader = DataFileReader.openReader(in, datumReader); - reader.sync(split.getStart()); // sync to start - this.start = reader.tell(); - this.end = split.getStart() + split.getLength(); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (!reader.hasNext() || reader.pastSync(end)) { - key = null; - value = null; - return false; - } - if (key == null) { - key = new AvroWrapper(); - } - if (value == null) { - value = NullWritable.get(); - } - key.datum(reader.next(key.datum())); - return true; - } - - @Override - public AvroWrapper getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public NullWritable getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException { - if (end == start) { - return 0.0f; - } else { - return Math.min(1.0f, (getPos() - start) / (float) (end - start)); - } - } - - public long getPos() throws IOException { - return reader.tell(); - } - - @Override - public void close() throws IOException { - reader.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java deleted file mode 100644 index 86613df..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.crunch.MapFn; -import org.apache.crunch.Pair; -import org.apache.crunch.lib.PTables; -import org.apache.crunch.types.PGroupedTableType; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.TupleDeepCopier; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; - -/** - * The implementation of the PTableType interface for Avro-based serialization. - * - */ -class AvroTableType extends AvroType> implements PTableType { - - private static class PairToAvroPair extends MapFn { - private final MapFn keyMapFn; - private final MapFn valueMapFn; - private final String firstJson; - private final String secondJson; - - private String pairSchemaJson; - private transient Schema pairSchema; - - public PairToAvroPair(AvroType keyType, AvroType valueType) { - this.keyMapFn = keyType.getOutputMapFn(); - this.firstJson = keyType.getSchema().toString(); - this.valueMapFn = valueType.getOutputMapFn(); - this.secondJson = valueType.getSchema().toString(); - } - - @Override - public void configure(Configuration conf) { - keyMapFn.configure(conf); - valueMapFn.configure(conf); - } - - @Override - public void setContext(TaskInputOutputContext context) { - keyMapFn.setContext(context); - valueMapFn.setContext(context); - } - - @Override - public void initialize() { - keyMapFn.initialize(); - valueMapFn.initialize(); - pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema( - new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString(); - } - - @Override - public org.apache.avro.mapred.Pair map(Pair input) { - if (pairSchema == null) { - pairSchema = new Schema.Parser().parse(pairSchemaJson); - } - org.apache.avro.mapred.Pair avroPair = new org.apache.avro.mapred.Pair(pairSchema); - avroPair.key(keyMapFn.map(input.first())); - avroPair.value(valueMapFn.map(input.second())); - return avroPair; - } - } - - private static class IndexedRecordToPair extends MapFn { - - private final MapFn firstMapFn; - private final MapFn secondMapFn; - - public IndexedRecordToPair(MapFn firstMapFn, MapFn secondMapFn) { - this.firstMapFn = firstMapFn; - this.secondMapFn = secondMapFn; - } - - @Override - public void configure(Configuration conf) { - firstMapFn.configure(conf); - secondMapFn.configure(conf); - } - - @Override - public void setContext(TaskInputOutputContext context) { - firstMapFn.setContext(context); - secondMapFn.setContext(context); - } - - @Override - public void initialize() { - firstMapFn.initialize(); - secondMapFn.initialize(); - } - - @Override - public Pair map(IndexedRecord input) { - return Pair.of(firstMapFn.map(input.get(0)), secondMapFn.map(input.get(1))); - } - } - - private final AvroType keyType; - private final AvroType valueType; - - public AvroTableType(AvroType keyType, AvroType valueType, Class> pairClass) { - super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), - valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(), - valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier( - Pair.class, keyType, valueType), keyType, valueType); - this.keyType = keyType; - this.valueType = valueType; - } - - @Override - public PType getKeyType() { - return keyType; - } - - @Override - public PType getValueType() { - return valueType; - } - - @Override - public PGroupedTableType getGroupedTableType() { - return new AvroGroupedTableType(this); - } - - @Override - public Pair getDetachedValue(Pair value) { - return PTables.getDetachedValue(this, value); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java deleted file mode 100644 index 4930235..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTextOutputFormat.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.IOException; - -import org.apache.avro.mapred.AvroWrapper; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -public class AvroTextOutputFormat extends TextOutputFormat { - class DatumRecordTextWriter extends RecordWriter { - private RecordWriter lineRecordWriter; - - public DatumRecordTextWriter(RecordWriter recordWriter) { - this.lineRecordWriter = recordWriter; - } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - lineRecordWriter.close(context); - } - - @Override - public void write(K arg0, V arg1) throws IOException, InterruptedException { - lineRecordWriter.write(getData(arg0), getData(arg1)); - } - - private Object getData(Object o) { - Object data = o; - if (o instanceof AvroWrapper) { - data = ((AvroWrapper) o).datum(); - } - return data; - } - } - - @Override - public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - RecordWriter recordWriter = super.getRecordWriter(context); - return new DatumRecordTextWriter(recordWriter); - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java deleted file mode 100644 index a92b0d0..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificRecord; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.crunch.MapFn; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.io.ReadableSourceTarget; -import org.apache.crunch.io.avro.AvroFileSourceTarget; -import org.apache.crunch.types.Converter; -import org.apache.crunch.types.DeepCopier; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.PTypeFamily; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -/** - * The implementation of the PType interface for Avro-based serialization. - * - */ -public class AvroType implements PType { - - private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); - - private final Class typeClass; - private final String schemaString; - private transient Schema schema; - private final MapFn baseInputMapFn; - private final MapFn baseOutputMapFn; - private final List subTypes; - private DeepCopier deepCopier; - private boolean initialized = false; - - public AvroType(Class typeClass, Schema schema, DeepCopier deepCopier, PType... ptypes) { - this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes); - } - - public AvroType(Class typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, - DeepCopier deepCopier, PType... ptypes) { - this.typeClass = typeClass; - this.schema = Preconditions.checkNotNull(schema); - this.schemaString = schema.toString(); - this.baseInputMapFn = inputMapFn; - this.baseOutputMapFn = outputMapFn; - this.deepCopier = deepCopier; - this.subTypes = ImmutableList. builder().add(ptypes).build(); - } - - @Override - public Class getTypeClass() { - return typeClass; - } - - @Override - public PTypeFamily getFamily() { - return AvroTypeFamily.getInstance(); - } - - @Override - public List getSubTypes() { - return Lists. newArrayList(subTypes); - } - - public Schema getSchema() { - if (schema == null) { - schema = new Schema.Parser().parse(schemaString); - } - return schema; - } - - /** - * Determine if the wrapped type is a specific data avro type or wraps one. - * - * @return true if the wrapped type is a specific data type or wraps one - */ - public boolean hasSpecific() { - if (Avros.isPrimitive(this)) { - return false; - } - - if (!this.subTypes.isEmpty()) { - for (PType subType : this.subTypes) { - AvroType atype = (AvroType) subType; - if (atype.hasSpecific()) { - return true; - } - } - return false; - } - - return SpecificRecord.class.isAssignableFrom(typeClass); - } - - /** - * Determine if the wrapped type is a generic data avro type. - * - * @return true if the wrapped type is a generic type - */ - public boolean isGeneric() { - return GenericData.Record.class.equals(typeClass); - } - - /** - * Determine if the wrapped type is a reflection-based avro type or wraps one. - * - * @return true if the wrapped type is a reflection-based type or wraps one. - */ - public boolean hasReflect() { - if (Avros.isPrimitive(this)) { - return false; - } - - if (!this.subTypes.isEmpty()) { - for (PType subType : this.subTypes) { - if (((AvroType) subType).hasReflect()) { - return true; - } - } - return false; - } - - return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class - .isAssignableFrom(typeClass)); - } - - public MapFn getInputMapFn() { - return baseInputMapFn; - } - - public MapFn getOutputMapFn() { - return baseOutputMapFn; - } - - @Override - public Converter getConverter() { - return AVRO_CONVERTER; - } - - @Override - public ReadableSourceTarget getDefaultFileSource(Path path) { - return new AvroFileSourceTarget(path, this); - } - - @Override - public void initialize(Configuration conf) { - deepCopier.initialize(conf); - initialized = true; - } - - @Override - public T getDetachedValue(T value) { - if (!initialized) { - throw new IllegalStateException("Cannot call getDetachedValue on an uninitialized PType"); - } - return deepCopier.deepCopy(value); - } - - @Override - public boolean equals(Object other) { - if (other == null || !(other instanceof AvroType)) { - return false; - } - AvroType at = (AvroType) other; - return (typeClass.equals(at.typeClass) && subTypes.equals(at.subTypes)); - - } - - @Override - public int hashCode() { - HashCodeBuilder hcb = new HashCodeBuilder(); - hcb.append(typeClass).append(subTypes); - return hcb.toHashCode(); - } - -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java deleted file mode 100644 index e09e173..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTypeFamily.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Map; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.crunch.MapFn; -import org.apache.crunch.Pair; -import org.apache.crunch.Tuple; -import org.apache.crunch.Tuple3; -import org.apache.crunch.Tuple4; -import org.apache.crunch.TupleN; -import org.apache.crunch.types.PGroupedTableType; -import org.apache.crunch.types.PTableType; -import org.apache.crunch.types.PType; -import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.PTypeUtils; - -public class AvroTypeFamily implements PTypeFamily { - - private static final AvroTypeFamily INSTANCE = new AvroTypeFamily(); - - public static AvroTypeFamily getInstance() { - return INSTANCE; - } - - // There can only be one instance. - private AvroTypeFamily() { - } - - @Override - public PType nulls() { - return Avros.nulls(); - } - - @Override - public PType strings() { - return Avros.strings(); - } - - @Override - public PType longs() { - return Avros.longs(); - } - - @Override - public PType ints() { - return Avros.ints(); - } - - @Override - public PType floats() { - return Avros.floats(); - } - - @Override - public PType doubles() { - return Avros.doubles(); - } - - @Override - public PType booleans() { - return Avros.booleans(); - } - - @Override - public PType bytes() { - return Avros.bytes(); - } - - @Override - public PType records(Class clazz) { - return Avros.records(clazz); - } - - public PType generics(Schema schema) { - return Avros.generics(schema); - } - - public PType containers(Class clazz) { - return Avros.containers(clazz); - } - - @Override - public PType> collections(PType ptype) { - return Avros.collections(ptype); - } - - @Override - public PType> maps(PType ptype) { - return Avros.maps(ptype); - } - - @Override - public PType> pairs(PType p1, PType p2) { - return Avros.pairs(p1, p2); - } - - @Override - public PType> triples(PType p1, PType p2, PType p3) { - return Avros.triples(p1, p2, p3); - } - - @Override - public PType> quads(PType p1, PType p2, PType p3, PType p4) { - return Avros.quads(p1, p2, p3, p4); - } - - @Override - public PType tuples(PType... ptypes) { - return Avros.tuples(ptypes); - } - - @Override - public PTableType tableOf(PType key, PType value) { - return Avros.tableOf(key, value); - } - - @Override - public PType as(PType ptype) { - if (ptype instanceof AvroType || ptype instanceof AvroGroupedTableType) { - return ptype; - } - if (ptype instanceof PGroupedTableType) { - PTableType ptt = ((PGroupedTableType) ptype).getTableType(); - return new AvroGroupedTableType((AvroTableType) as(ptt)); - } - Class typeClass = ptype.getTypeClass(); - PType prim = Avros.getPrimitiveType(typeClass); - if (prim != null) { - return prim; - } - return PTypeUtils.convert(ptype, this); - } - - @Override - public PType tuples(Class clazz, PType... ptypes) { - return Avros.tuples(clazz, ptypes); - } - - @Override - public PType derived(Class clazz, MapFn inputFn, MapFn outputFn, PType base) { - return Avros.derived(clazz, inputFn, outputFn, base); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java deleted file mode 100644 index 9460fa5..0000000 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroUtf8InputFormat.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.types.avro; - -import java.io.IOException; - -import org.apache.avro.mapred.AvroWrapper; -import org.apache.avro.util.Utf8; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; - -/** - * An {@link org.apache.hadoop.mapred.InputFormat} for text files. Each line is - * a {@link Utf8} key; values are null. - */ -public class AvroUtf8InputFormat extends FileInputFormat, NullWritable> { - - static class Utf8LineRecordReader extends RecordReader, NullWritable> { - - private LineRecordReader lineRecordReader; - - private AvroWrapper currentKey = new AvroWrapper(); - - public Utf8LineRecordReader() throws IOException { - this.lineRecordReader = new LineRecordReader(); - } - - public void close() throws IOException { - lineRecordReader.close(); - } - - public float getProgress() throws IOException { - return lineRecordReader.getProgress(); - } - - @Override - public AvroWrapper getCurrentKey() throws IOException, InterruptedException { - Text txt = lineRecordReader.getCurrentValue(); - currentKey.datum(new Utf8(txt.toString())); - return currentKey; - } - - @Override - public NullWritable getCurrentValue() throws IOException, InterruptedException { - return NullWritable.get(); - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - lineRecordReader.initialize(split, context); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return lineRecordReader.nextKeyValue(); - } - } - - private CompressionCodecFactory compressionCodecs = null; - - public void configure(Configuration conf) { - compressionCodecs = new CompressionCodecFactory(conf); - } - - protected boolean isSplitable(FileSystem fs, Path file) { - return compressionCodecs.getCodec(file) == null; - } - - @Override - public RecordReader, NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - return new Utf8LineRecordReader(); - } -}