crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-363: Fix protobufs to work with collection and union wrapper types.
Date Thu, 13 Mar 2014 21:14:59 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 18028aab3 -> eac45b00a


CRUNCH-363: Fix protobufs to work with collection and union wrapper types.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/eac45b00
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/eac45b00
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/eac45b00

Branch: refs/heads/master
Commit: eac45b00a90e1c248a4dedeade44a82b25347fec
Parents: 18028aa
Author: Josh Wills <jwills@apache.org>
Authored: Thu Mar 13 11:52:59 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Mar 13 11:55:55 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/CogroupIT.java   |  69 ++
 .../org/apache/crunch/lib/PersonProtos.java     | 695 +++++++++++++++++++
 crunch-core/src/it/resources/person.proto       |  28 +
 .../apache/crunch/types/writable/Writables.java |  17 +-
 4 files changed, 804 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 191c737..9be5b1e 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -23,12 +23,15 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.crunch.*;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.lib.PersonProtos.Person;
+import org.apache.crunch.lib.PersonProtos.Person.Builder;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.test.Tests;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.After;
@@ -108,6 +111,26 @@ public class CogroupIT {
     runCogroupN(AvroTypeFamily.getInstance());
   }
 
+  @Test
+  public void testCogroupProtosWritables() {
+      runCogroupProtos(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosAvro() {
+      runCogroupProtos(AvroTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosPairsWritables() {
+    runCogroupProtosPairs(WritableTypeFamily.getInstance());
+  }
+
+  @Test
+  public void testCogroupProtosPairsAvro() {
+    runCogroupProtosPairs(AvroTypeFamily.getInstance());
+  }
+
   public void runCogroup(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -133,6 +156,32 @@ public class CogroupIT {
     assertThat(actual, is(expected));
   }
 
+  public void runCogroupProtos(PTypeFamily ptf) {
+    PTableType<String, Person> tt = ptf.tableOf(ptf.strings(), PTypes.protos(Person.class,
ptf));
+
+    PTable<String, Person> kv1 = lines1.parallelDo("kv1", new GenerateProto(), tt);
+    PTable<String, Person> kv2 = lines2.parallelDo("kv2", new GenerateProto(), tt);
+
+    PTable<String, Pair<Collection<Person>, Collection<Person>>>
cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<Person>, Collection<Person>>> result
= cg.materializeToMap();
+
+    assertThat(result.size(), is(4));
+  }
+
+  public void runCogroupProtosPairs(PTypeFamily ptf) {
+    PTableType<String, Pair<String, Person>> tt = ptf.tableOf(ptf.strings(),
ptf.pairs(ptf.strings(), PTypes.protos(Person.class, ptf)));
+
+    PTable<String, Pair<String, Person>> kv1 = lines1.parallelDo("kv1", new GenerateProtoPairs(),
tt);
+    PTable<String, Pair<String, Person>> kv2 = lines2.parallelDo("kv2", new GenerateProtoPairs(),
tt);
+
+    PTable<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String,
Person>>>> cg = Cogroup.cogroup(kv1, kv2);
+
+    Map<String, Pair<Collection<Pair<String, Person>>, Collection<Pair<String,
Person>>>> result = cg.materializeToMap();
+
+    assertThat(result.size(), is(4));
+  }
+
   public void runCogroup3(PTypeFamily ptf) {
     PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings());
 
@@ -229,6 +278,26 @@ public class CogroupIT {
     }
   }
 
+  private static class GenerateProto extends DoFn<String, Pair<String, Person>>
{
+      @Override
+      public void process(String input, Emitter<Pair<String, Person>> emitter)
{
+          String[] fields = input.split(",");
+          String key = fields[0];
+          Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+          emitter.emit(Pair.of(fields[0], b.build()));
+      }
+  }
+
+  private static class GenerateProtoPairs extends DoFn<String, Pair<String, Pair<String,
Person>>> {
+      @Override
+      public void process(String input, Emitter<Pair<String, Pair<String, Person>>>
emitter) {
+          String[] fields = input.split(",");
+          String key = fields[0];
+          Builder b = Person.newBuilder().setFirst("first"+key).setLast("last"+key);
+          emitter.emit(Pair.of(fields[0], Pair.of(fields[1], b.build())));
+      }
+  }
+
   private static Collection<String> coll(String... values) {
     return ImmutableSet.copyOf(values);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
new file mode 100644
index 0000000..c604861
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/PersonProtos.java
@@ -0,0 +1,695 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: resources/person.proto
+
+package org.apache.crunch.lib;
+
+public final class PersonProtos {
+    private PersonProtos() {}
+    public static void registerAllExtensions(
+            com.google.protobuf.ExtensionRegistry registry) {
+    }
+    public interface PersonOrBuilder
+            extends com.google.protobuf.MessageOrBuilder {
+
+        // optional string first = 1;
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        boolean hasFirst();
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        java.lang.String getFirst();
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        com.google.protobuf.ByteString
+        getFirstBytes();
+
+        // optional string last = 2;
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        boolean hasLast();
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        java.lang.String getLast();
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        com.google.protobuf.ByteString
+        getLastBytes();
+    }
+    /**
+     * Protobuf type {@code crunch.Person}
+     */
+    public static final class Person extends
+            com.google.protobuf.GeneratedMessage
+            implements PersonOrBuilder {
+        // Use Person.newBuilder() to construct.
+        private Person(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+            super(builder);
+            this.unknownFields = builder.getUnknownFields();
+        }
+        private Person(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance();
}
+
+        private static final Person defaultInstance;
+        public static Person getDefaultInstance() {
+            return defaultInstance;
+        }
+
+        public Person getDefaultInstanceForType() {
+            return defaultInstance;
+        }
+
+        private final com.google.protobuf.UnknownFieldSet unknownFields;
+        @java.lang.Override
+        public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+            return this.unknownFields;
+        }
+        private Person(
+                com.google.protobuf.CodedInputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            initFields();
+            int mutable_bitField0_ = 0;
+            com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+                    com.google.protobuf.UnknownFieldSet.newBuilder();
+            try {
+                boolean done = false;
+                while (!done) {
+                    int tag = input.readTag();
+                    switch (tag) {
+                        case 0:
+                            done = true;
+                            break;
+                        default: {
+                            if (!parseUnknownField(input, unknownFields,
+                                    extensionRegistry, tag)) {
+                                done = true;
+                            }
+                            break;
+                        }
+                        case 10: {
+                            bitField0_ |= 0x00000001;
+                            first_ = input.readBytes();
+                            break;
+                        }
+                        case 18: {
+                            bitField0_ |= 0x00000002;
+                            last_ = input.readBytes();
+                            break;
+                        }
+                    }
+                }
+            } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+                throw e.setUnfinishedMessage(this);
+            } catch (java.io.IOException e) {
+                throw new com.google.protobuf.InvalidProtocolBufferException(
+                        e.getMessage()).setUnfinishedMessage(this);
+            } finally {
+                this.unknownFields = unknownFields.build();
+                makeExtensionsImmutable();
+            }
+        }
+        public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+            return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+        }
+
+        protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+            return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable
+                    .ensureFieldAccessorsInitialized(
+                            org.apache.crunch.lib.PersonProtos.Person.class, org.apache.crunch.lib.PersonProtos.Person.Builder.class);
+        }
+
+        public static com.google.protobuf.Parser<Person> PARSER =
+                new com.google.protobuf.AbstractParser<Person>() {
+                    public Person parsePartialFrom(
+                            com.google.protobuf.CodedInputStream input,
+                            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                            throws com.google.protobuf.InvalidProtocolBufferException {
+                        return new Person(input, extensionRegistry);
+                    }
+                };
+
+        @java.lang.Override
+        public com.google.protobuf.Parser<Person> getParserForType() {
+            return PARSER;
+        }
+
+        private int bitField0_;
+        // optional string first = 1;
+        public static final int FIRST_FIELD_NUMBER = 1;
+        private java.lang.Object first_;
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        public boolean hasFirst() {
+            return ((bitField0_ & 0x00000001) == 0x00000001);
+        }
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        public java.lang.String getFirst() {
+            java.lang.Object ref = first_;
+            if (ref instanceof java.lang.String) {
+                return (java.lang.String) ref;
+            } else {
+                com.google.protobuf.ByteString bs =
+                        (com.google.protobuf.ByteString) ref;
+                java.lang.String s = bs.toStringUtf8();
+                if (bs.isValidUtf8()) {
+                    first_ = s;
+                }
+                return s;
+            }
+        }
+        /**
+         * <code>optional string first = 1;</code>
+         */
+        public com.google.protobuf.ByteString
+        getFirstBytes() {
+            java.lang.Object ref = first_;
+            if (ref instanceof java.lang.String) {
+                com.google.protobuf.ByteString b =
+                        com.google.protobuf.ByteString.copyFromUtf8(
+                                (java.lang.String) ref);
+                first_ = b;
+                return b;
+            } else {
+                return (com.google.protobuf.ByteString) ref;
+            }
+        }
+
+        // optional string last = 2;
+        public static final int LAST_FIELD_NUMBER = 2;
+        private java.lang.Object last_;
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        public boolean hasLast() {
+            return ((bitField0_ & 0x00000002) == 0x00000002);
+        }
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        public java.lang.String getLast() {
+            java.lang.Object ref = last_;
+            if (ref instanceof java.lang.String) {
+                return (java.lang.String) ref;
+            } else {
+                com.google.protobuf.ByteString bs =
+                        (com.google.protobuf.ByteString) ref;
+                java.lang.String s = bs.toStringUtf8();
+                if (bs.isValidUtf8()) {
+                    last_ = s;
+                }
+                return s;
+            }
+        }
+        /**
+         * <code>optional string last = 2;</code>
+         */
+        public com.google.protobuf.ByteString
+        getLastBytes() {
+            java.lang.Object ref = last_;
+            if (ref instanceof java.lang.String) {
+                com.google.protobuf.ByteString b =
+                        com.google.protobuf.ByteString.copyFromUtf8(
+                                (java.lang.String) ref);
+                last_ = b;
+                return b;
+            } else {
+                return (com.google.protobuf.ByteString) ref;
+            }
+        }
+
+        private void initFields() {
+            first_ = "";
+            last_ = "";
+        }
+        private byte memoizedIsInitialized = -1;
+        public final boolean isInitialized() {
+            byte isInitialized = memoizedIsInitialized;
+            if (isInitialized != -1) return isInitialized == 1;
+
+            memoizedIsInitialized = 1;
+            return true;
+        }
+
+        public void writeTo(com.google.protobuf.CodedOutputStream output)
+                throws java.io.IOException {
+            getSerializedSize();
+            if (((bitField0_ & 0x00000001) == 0x00000001)) {
+                output.writeBytes(1, getFirstBytes());
+            }
+            if (((bitField0_ & 0x00000002) == 0x00000002)) {
+                output.writeBytes(2, getLastBytes());
+            }
+            getUnknownFields().writeTo(output);
+        }
+
+        private int memoizedSerializedSize = -1;
+        public int getSerializedSize() {
+            int size = memoizedSerializedSize;
+            if (size != -1) return size;
+
+            size = 0;
+            if (((bitField0_ & 0x00000001) == 0x00000001)) {
+                size += com.google.protobuf.CodedOutputStream
+                        .computeBytesSize(1, getFirstBytes());
+            }
+            if (((bitField0_ & 0x00000002) == 0x00000002)) {
+                size += com.google.protobuf.CodedOutputStream
+                        .computeBytesSize(2, getLastBytes());
+            }
+            size += getUnknownFields().getSerializedSize();
+            memoizedSerializedSize = size;
+            return size;
+        }
+
+        private static final long serialVersionUID = 0L;
+        @java.lang.Override
+        protected java.lang.Object writeReplace()
+                throws java.io.ObjectStreamException {
+            return super.writeReplace();
+        }
+
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                com.google.protobuf.ByteString data)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            return PARSER.parseFrom(data);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                com.google.protobuf.ByteString data,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            return PARSER.parseFrom(data, extensionRegistry);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(byte[] data)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            return PARSER.parseFrom(data);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                byte[] data,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws com.google.protobuf.InvalidProtocolBufferException {
+            return PARSER.parseFrom(data, extensionRegistry);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(java.io.InputStream
input)
+                throws java.io.IOException {
+            return PARSER.parseFrom(input);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                java.io.InputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws java.io.IOException {
+            return PARSER.parseFrom(input, extensionRegistry);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(java.io.InputStream
input)
+                throws java.io.IOException {
+            return PARSER.parseDelimitedFrom(input);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseDelimitedFrom(
+                java.io.InputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws java.io.IOException {
+            return PARSER.parseDelimitedFrom(input, extensionRegistry);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                com.google.protobuf.CodedInputStream input)
+                throws java.io.IOException {
+            return PARSER.parseFrom(input);
+        }
+        public static org.apache.crunch.lib.PersonProtos.Person parseFrom(
+                com.google.protobuf.CodedInputStream input,
+                com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                throws java.io.IOException {
+            return PARSER.parseFrom(input, extensionRegistry);
+        }
+
+        public static Builder newBuilder() { return Builder.create(); }
+        public Builder newBuilderForType() { return newBuilder(); }
+        public static Builder newBuilder(org.apache.crunch.lib.PersonProtos.Person prototype)
{
+            return newBuilder().mergeFrom(prototype);
+        }
+        public Builder toBuilder() { return newBuilder(this); }
+
+        @java.lang.Override
+        protected Builder newBuilderForType(
+                com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+            Builder builder = new Builder(parent);
+            return builder;
+        }
+        /**
+         * Protobuf type {@code crunch.Person}
+         */
+        public static final class Builder extends
+                com.google.protobuf.GeneratedMessage.Builder<Builder>
+                implements org.apache.crunch.lib.PersonProtos.PersonOrBuilder {
+            public static final com.google.protobuf.Descriptors.Descriptor
+            getDescriptor() {
+                return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+            }
+
+            protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+            internalGetFieldAccessorTable() {
+                return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_fieldAccessorTable
+                        .ensureFieldAccessorsInitialized(
+                                org.apache.crunch.lib.PersonProtos.Person.class, org.apache.crunch.lib.PersonProtos.Person.Builder.class);
+            }
+
+            // Construct using org.apache.crunch.lib.PersonProtos.Person.newBuilder()
+            private Builder() {
+                maybeForceBuilderInitialization();
+            }
+
+            private Builder(
+                    com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+                super(parent);
+                maybeForceBuilderInitialization();
+            }
+            private void maybeForceBuilderInitialization() {
+                if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+                }
+            }
+            private static Builder create() {
+                return new Builder();
+            }
+
+            public Builder clear() {
+                super.clear();
+                first_ = "";
+                bitField0_ = (bitField0_ & ~0x00000001);
+                last_ = "";
+                bitField0_ = (bitField0_ & ~0x00000002);
+                return this;
+            }
+
+            public Builder clone() {
+                return create().mergeFrom(buildPartial());
+            }
+
+            public com.google.protobuf.Descriptors.Descriptor
+            getDescriptorForType() {
+                return org.apache.crunch.lib.PersonProtos.internal_static_crunch_Person_descriptor;
+            }
+
+            public org.apache.crunch.lib.PersonProtos.Person getDefaultInstanceForType()
{
+                return org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance();
+            }
+
+            public org.apache.crunch.lib.PersonProtos.Person build() {
+                org.apache.crunch.lib.PersonProtos.Person result = buildPartial();
+                if (!result.isInitialized()) {
+                    throw newUninitializedMessageException(result);
+                }
+                return result;
+            }
+
+            public org.apache.crunch.lib.PersonProtos.Person buildPartial() {
+                org.apache.crunch.lib.PersonProtos.Person result = new org.apache.crunch.lib.PersonProtos.Person(this);
+                int from_bitField0_ = bitField0_;
+                int to_bitField0_ = 0;
+                if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+                    to_bitField0_ |= 0x00000001;
+                }
+                result.first_ = first_;
+                if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+                    to_bitField0_ |= 0x00000002;
+                }
+                result.last_ = last_;
+                result.bitField0_ = to_bitField0_;
+                onBuilt();
+                return result;
+            }
+
+            public Builder mergeFrom(com.google.protobuf.Message other) {
+                if (other instanceof org.apache.crunch.lib.PersonProtos.Person) {
+                    return mergeFrom((org.apache.crunch.lib.PersonProtos.Person)other);
+                } else {
+                    super.mergeFrom(other);
+                    return this;
+                }
+            }
+
+            public Builder mergeFrom(org.apache.crunch.lib.PersonProtos.Person other) {
+                if (other == org.apache.crunch.lib.PersonProtos.Person.getDefaultInstance())
return this;
+                if (other.hasFirst()) {
+                    bitField0_ |= 0x00000001;
+                    first_ = other.first_;
+                    onChanged();
+                }
+                if (other.hasLast()) {
+                    bitField0_ |= 0x00000002;
+                    last_ = other.last_;
+                    onChanged();
+                }
+                this.mergeUnknownFields(other.getUnknownFields());
+                return this;
+            }
+
+            public final boolean isInitialized() {
+                return true;
+            }
+
+            public Builder mergeFrom(
+                    com.google.protobuf.CodedInputStream input,
+                    com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                    throws java.io.IOException {
+                org.apache.crunch.lib.PersonProtos.Person parsedMessage = null;
+                try {
+                    parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+                } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+                    parsedMessage = (org.apache.crunch.lib.PersonProtos.Person) e.getUnfinishedMessage();
+                    throw e;
+                } finally {
+                    if (parsedMessage != null) {
+                        mergeFrom(parsedMessage);
+                    }
+                }
+                return this;
+            }
+            private int bitField0_;
+
+            // optional string first = 1;
+            private java.lang.Object first_ = "";
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public boolean hasFirst() {
+                return ((bitField0_ & 0x00000001) == 0x00000001);
+            }
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public java.lang.String getFirst() {
+                java.lang.Object ref = first_;
+                if (!(ref instanceof java.lang.String)) {
+                    java.lang.String s = ((com.google.protobuf.ByteString) ref)
+                            .toStringUtf8();
+                    first_ = s;
+                    return s;
+                } else {
+                    return (java.lang.String) ref;
+                }
+            }
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public com.google.protobuf.ByteString
+            getFirstBytes() {
+                java.lang.Object ref = first_;
+                if (ref instanceof String) {
+                    com.google.protobuf.ByteString b =
+                            com.google.protobuf.ByteString.copyFromUtf8(
+                                    (java.lang.String) ref);
+                    first_ = b;
+                    return b;
+                } else {
+                    return (com.google.protobuf.ByteString) ref;
+                }
+            }
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public Builder setFirst(
+                    java.lang.String value) {
+                if (value == null) {
+                    throw new NullPointerException();
+                }
+                bitField0_ |= 0x00000001;
+                first_ = value;
+                onChanged();
+                return this;
+            }
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public Builder clearFirst() {
+                bitField0_ = (bitField0_ & ~0x00000001);
+                first_ = getDefaultInstance().getFirst();
+                onChanged();
+                return this;
+            }
+            /**
+             * <code>optional string first = 1;</code>
+             */
+            public Builder setFirstBytes(
+                    com.google.protobuf.ByteString value) {
+                if (value == null) {
+                    throw new NullPointerException();
+                }
+                bitField0_ |= 0x00000001;
+                first_ = value;
+                onChanged();
+                return this;
+            }
+
+            // optional string last = 2;
+            private java.lang.Object last_ = "";
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public boolean hasLast() {
+                return ((bitField0_ & 0x00000002) == 0x00000002);
+            }
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public java.lang.String getLast() {
+                java.lang.Object ref = last_;
+                if (!(ref instanceof java.lang.String)) {
+                    java.lang.String s = ((com.google.protobuf.ByteString) ref)
+                            .toStringUtf8();
+                    last_ = s;
+                    return s;
+                } else {
+                    return (java.lang.String) ref;
+                }
+            }
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public com.google.protobuf.ByteString
+            getLastBytes() {
+                java.lang.Object ref = last_;
+                if (ref instanceof String) {
+                    com.google.protobuf.ByteString b =
+                            com.google.protobuf.ByteString.copyFromUtf8(
+                                    (java.lang.String) ref);
+                    last_ = b;
+                    return b;
+                } else {
+                    return (com.google.protobuf.ByteString) ref;
+                }
+            }
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public Builder setLast(
+                    java.lang.String value) {
+                if (value == null) {
+                    throw new NullPointerException();
+                }
+                bitField0_ |= 0x00000002;
+                last_ = value;
+                onChanged();
+                return this;
+            }
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public Builder clearLast() {
+                bitField0_ = (bitField0_ & ~0x00000002);
+                last_ = getDefaultInstance().getLast();
+                onChanged();
+                return this;
+            }
+            /**
+             * <code>optional string last = 2;</code>
+             */
+            public Builder setLastBytes(
+                    com.google.protobuf.ByteString value) {
+                if (value == null) {
+                    throw new NullPointerException();
+                }
+                bitField0_ |= 0x00000002;
+                last_ = value;
+                onChanged();
+                return this;
+            }
+
+            // @@protoc_insertion_point(builder_scope:crunch.Person)
+        }
+
+        static {
+            defaultInstance = new Person(true);
+            defaultInstance.initFields();
+        }
+
+        // @@protoc_insertion_point(class_scope:crunch.Person)
+    }
+
+    private static com.google.protobuf.Descriptors.Descriptor
+            internal_static_crunch_Person_descriptor;
+    private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+            internal_static_crunch_Person_fieldAccessorTable;
+
+    public static com.google.protobuf.Descriptors.FileDescriptor
+    getDescriptor() {
+        return descriptor;
+    }
+    private static com.google.protobuf.Descriptors.FileDescriptor
+            descriptor;
+    static {
+        java.lang.String[] descriptorData = {
+                "\n\ngist.proto\022\006crunch\"%\n\006Person\022\r\n\005first\030" +
+                        "\001 \001(\t\022\014\n\004last\030\002 \001(\tB\'\n\025org.apache.crunch"
+
+                        ".libB\014PersonProtosH\001"
+        };
+        com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner
=
+                new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner()
{
+                    public com.google.protobuf.ExtensionRegistry assignDescriptors(
+                            com.google.protobuf.Descriptors.FileDescriptor root) {
+                        descriptor = root;
+                        internal_static_crunch_Person_descriptor =
+                                getDescriptor().getMessageTypes().get(0);
+                        internal_static_crunch_Person_fieldAccessorTable = new
+                                com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+                                internal_static_crunch_Person_descriptor,
+                                new java.lang.String[] { "First", "Last", });
+                        return null;
+                    }
+                };
+        com.google.protobuf.Descriptors.FileDescriptor
+                .internalBuildGeneratedFileFrom(descriptorData,
+                        new com.google.protobuf.Descriptors.FileDescriptor[] {
+                        }, assigner);
+    }
+
+    // @@protoc_insertion_point(outer_class_scope)
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/it/resources/person.proto
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/resources/person.proto b/crunch-core/src/it/resources/person.proto
new file mode 100644
index 0000000..b973234
--- /dev/null
+++ b/crunch-core/src/it/resources/person.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package crunch;
+
+option java_package = "org.apache.crunch.lib";
+option java_outer_classname = "PersonProtos";
+
+option optimize_for = SPEED;
+
+message Person {
+   optional string first = 1;
+   optional string last = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/eac45b00/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
index d087ca3..89464ac 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -359,6 +359,14 @@ public class Writables {
     return new WritableTableType((WritableType) key, (WritableType) value);
   }
 
+  private static BytesWritable asBytesWritable(Writable w) {
+    if (w instanceof BytesWritable) {
+      return (BytesWritable) w;
+    } else {
+      return new BytesWritable(WritableUtils.toByteArray(w));
+    }
+  }
+
   private static <W extends Writable> W create(Class<W> clazz, Writable writable)
{
     if (clazz.equals(writable.getClass())) {
       return (W) writable;
@@ -512,7 +520,7 @@ public class Writables {
             values[i] = w;
             written[i] = WRITABLE_CODES.inverse().get(w.getClass());
           } else {
-            values[i] = new BytesWritable(WritableUtils.toByteArray(w));
+            values[i] = asBytesWritable(w);
             written[i] = 1; // code for BytesWritable
           }
         }
@@ -652,7 +660,7 @@ public class Writables {
     public UnionWritable map(Union input) {
       int index = input.getIndex();
       Writable w = (Writable) fns.get(index).map(input.getValue());
-      return new UnionWritable(index, new BytesWritable(WritableUtils.toByteArray(w)));
+      return new UnionWritable(index, asBytesWritable(w));
     }
   }
 
@@ -744,8 +752,7 @@ public class Writables {
       BytesWritable[] w = new BytesWritable[input.size()];
       int index = 0;
       for (T in : input) {
-        Writable v = (Writable) mapFn.map(in);
-        w[index++] = new BytesWritable(WritableUtils.toByteArray(v));
+        w[index++] = asBytesWritable((Writable) mapFn.map(in));
       }
       arrayWritable.set(w);
       return arrayWritable;
@@ -822,7 +829,7 @@ public class Writables {
       TextMapWritable tmw = new TextMapWritable();
       for (Map.Entry<String, T> e : input.entrySet()) {
         Writable w = mapFn.map(e.getValue());
-        tmw.put(new Text(e.getKey()), new BytesWritable(WritableUtils.toByteArray(w)));
+        tmw.put(new Text(e.getKey()), asBytesWritable(w));
       }
       return tmw;
     }


Mime
View raw message