beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4453) Provide automatic schema registration for POJOs
Date Mon, 09 Jul 2018 18:22:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4453?focusedWorklogId=120951&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120951 ]

ASF GitHub Bot logged work on BEAM-4453:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/18 18:21
            Start Date: 09/Jul/18 18:21
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #5873: [BEAM-4453] Add schema support for Java POJOs and Java Beans
URL: https://github.com/apache/beam/pull/5873
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 879eb01b6f9..28fc0275c53 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -38,6 +38,7 @@
   static final ImmutableMap<TypeName, Coder> CODER_MAP =
       ImmutableMap.<TypeName, Coder>builder()
           .put(TypeName.BYTE, ByteCoder.of())
+          .put(TypeName.BYTES, ByteArrayCoder.of())
           .put(TypeName.INT16, BigEndianShortCoder.of())
           .put(TypeName.INT32, BigEndianIntegerCoder.of())
           .put(TypeName.INT64, BigEndianLongCoder.of())
@@ -132,6 +133,9 @@ private static long estimatedSizeBytes(FieldType typeDescriptor, Object value) {
           listSizeBytes += estimatedSizeBytes(typeDescriptor.getCollectionElementType(), elem);
         }
         return 4 + listSizeBytes;
+      case BYTES:
+        byte[] bytes = (byte[]) value;
+        return 4L + bytes.length;
       case MAP:
         Map<Object, Object> map = (Map<Object, Object>) value;
         long mapSizeBytes = 0;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
index c4e3269ab3b..b13b5cd0baa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/DefaultSchema.java
@@ -67,7 +67,7 @@
    * {@link SchemaProvider} for default schemas. Looks up the provider annotated for a type, and
    * delegates to that provider.
    */
-  class DefaultSchemaProvider extends SchemaProvider {
+  class DefaultSchemaProvider implements SchemaProvider {
     final Map<TypeDescriptor, SchemaProvider> cachedProviders = Maps.newConcurrentMap();
 
     @Nullable
@@ -83,10 +83,7 @@ private SchemaProvider getSchemaProvider(TypeDescriptor<?> typeDescriptor) {
             Class<? extends SchemaProvider> providerClass = annotation.value();
             checkArgument(
                 providerClass != null,
-                "Type "
-                    + type
-                    + " has a @DefaultSchemaProvider annotation "
-                    + " with a null argument.");
+                "Type " + type + " has a @DefaultSchema annotation with a null argument.");
 
             try {
               return providerClass.getDeclaredConstructor().newInstance();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
new file mode 100644
index 00000000000..108c74601ce
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowWithGetters;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.reflect.FieldValueGetterFactory;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetterFactory;
+
+/**
+ * A {@link SchemaProvider} base class that vends schemas and rows based on {@link
+ * org.apache.beam.sdk.values.reflect.FieldValueGetter}s.
+ */
+@Experimental(Kind.SCHEMAS)
+public abstract class GetterBasedSchemaProvider implements SchemaProvider {
+  /** Implementing class should override to return a getter factory. */
+  abstract FieldValueGetterFactory fieldValueGetterFactory();
+
+  /** Implementing class should override to return a setter factory. */
+  abstract FieldValueSetterFactory fieldValueSetterFactory();
+
+  @Override
+  public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {
+    return o ->
+        Row.withSchema(schemaFor(typeDescriptor))
+            .withFieldValueGetters(fieldValueGetterFactory(), o)
+            .build();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
+    return r -> {
+      if (r instanceof RowWithGetters) {
+        // Efficient path: simply extract the underlying POJO instead of creating a new one.
+        return (T) ((RowWithGetters) r).getGetterTarget();
+      } else {
+        // Use the setters to copy values from the Row to a new instance of the class.
+        return fromRow(r, (Class<T>) typeDescriptor.getType());
+      }
+    };
+  }
+
+  private <T> T fromRow(Row row, Class<T> clazz) {
+    T object;
+    try {
+      object = clazz.getDeclaredConstructor().newInstance();
+    } catch (NoSuchMethodException
+        | IllegalAccessException
+        | InvocationTargetException
+        | InstantiationException e) {
+      throw new RuntimeException("Failed to instantiate object ", e);
+    }
+
+    Schema schema = row.getSchema();
+    List<FieldValueSetter> setters = fieldValueSetterFactory().createSetters(clazz, schema);
+    checkState(
+        setters.size() == row.getFieldCount(),
+        "Did not have a matching number of setters and fields.");
+
+    // Iterate over the row, and set (possibly recursively) each field in the underlying object
+    // using the setter.
+    for (int i = 0; i < row.getFieldCount(); ++i) {
+      FieldType type = schema.getField(i).getType();
+      FieldValueSetter setter = setters.get(i);
+      setter.set(
+          object,
+          fromValue(
+              type,
+              row.getValue(i),
+              setter.type(),
+              setter.elementType(),
+              setter.mapKeyType(),
+              setter.mapValueType()));
+    }
+    return object;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> T fromValue(
+      FieldType type, T value, Type fieldType, Type elemenentType, Type keyType, Type valueType) {
+    if (TypeName.ROW.equals(type.getTypeName())) {
+      return (T) fromRow((Row) value, (Class) fieldType);
+    } else if (TypeName.ARRAY.equals(type.getTypeName())) {
+      return (T) fromListValue(type.getCollectionElementType(), (List) value, elemenentType);
+    } else if (TypeName.MAP.equals(type.getTypeName())) {
+      return (T)
+          fromMapValue(
+              type.getMapKeyType(), type.getMapValueType(), (Map) value, keyType, valueType);
+    } else {
+      return value;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> List fromListValue(FieldType elementType, List<T> rowList, Type elementClass) {
+    List list = Lists.newArrayList();
+    for (T element : rowList) {
+      list.add(fromValue(elementType, element, elementClass, null, null, null));
+    }
+    return list;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<?, ?> fromMapValue(
+      FieldType keyType, FieldType valueType, Map<?, ?> map, Type keyClass, Type valueClass) {
+    Map newMap = Maps.newHashMap();
+    for (Map.Entry<?, ?> entry : map.entrySet()) {
+      Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, null);
+      Object value = fromValue(valueType, entry.getValue(), valueClass, null, null, null);
+      newMap.put(key, value);
+    }
+    return newMap;
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
new file mode 100644
index 00000000000..ed16592b3a3
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.reflect.FieldValueGetterFactory;
+import org.apache.beam.sdk.values.reflect.FieldValueSetterFactory;
+import org.apache.beam.sdk.values.reflect.JavaBeanGetterFactory;
+import org.apache.beam.sdk.values.reflect.JavaBeanSetterFactory;
+
+/**
+ * A {@link SchemaProvider} for Java Bean objects.
+ *
+ * <p>This provider finds (recursively) all public getters and setters in a Java object, and creates
+ * schemas and rows that bind to those fields. The field order in the schema is not guaranteed to
+ * match the method order in the class.
+ */
+@Experimental(Kind.SCHEMAS)
+public class JavaBeanSchema extends GetterBasedSchemaProvider {
+  @Override
+  public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+    return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
+  }
+
+  @Override
+  public FieldValueGetterFactory fieldValueGetterFactory() {
+    return new JavaBeanGetterFactory();
+  }
+
+  @Override
+  public FieldValueSetterFactory fieldValueSetterFactory() {
+    return new JavaBeanSetterFactory();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
new file mode 100644
index 00000000000..c2a9c5df9fa
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.reflect.FieldValueGetterFactory;
+import org.apache.beam.sdk.values.reflect.FieldValueSetterFactory;
+import org.apache.beam.sdk.values.reflect.PojoValueGetterFactory;
+import org.apache.beam.sdk.values.reflect.PojoValueSetterFactory;
+
+/**
+ * A {@link SchemaProvider} for Java POJO objects.
+ *
+ * <p>This provider finds all public fields (recursively) in a Java object, and creates schemas and
+ * rows that bind to those fields. The field order in the schema is not guaranteed to match the
+ * field order in the class.
+ */
+@Experimental(Kind.SCHEMAS)
+public class JavaFieldSchema extends GetterBasedSchemaProvider {
+  @Override
+  public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+    return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType());
+  }
+
+  @Override
+  public FieldValueGetterFactory fieldValueGetterFactory() {
+    return new PojoValueGetterFactory();
+  }
+
+  @Override
+  public FieldValueSetterFactory fieldValueSetterFactory() {
+    return new PojoValueSetterFactory();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 7707f6f63ef..c6d2bbd5041 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -42,8 +42,11 @@
 @Experimental(Kind.SCHEMAS)
 public class Schema implements Serializable {
   // A mapping between field names an indices.
-  private BiMap<String, Integer> fieldIndices = HashBiMap.create();
-  private List<Field> fields;
+  private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
+  private final List<Field> fields;
+  // Cache the hashCode, so it doesn't have to be recomputed. Schema objects are immutable, so this
+  // is correct.
+  private final int hashCode;
 
   /** Builder class for building {@link Schema} objects. */
   public static class Builder {
@@ -82,6 +85,11 @@ public Builder addByteField(String name) {
       return this;
     }
 
+    public Builder addByteArrayField(String name) {
+      fields.add(Field.of(name, FieldType.BYTES));
+      return this;
+    }
+
     public Builder addInt16Field(String name) {
       fields.add(Field.of(name, FieldType.INT16));
       return this;
@@ -157,6 +165,7 @@ public Schema(List<Field> fields) {
     for (Field field : fields) {
       fieldIndices.put(field.getName(), index++);
     }
+    this.hashCode = Objects.hash(fieldIndices, fields);
   }
 
   public static Schema of(Field... fields) {
@@ -186,7 +195,7 @@ public String toString() {
 
   @Override
   public int hashCode() {
-    return Objects.hash(fieldIndices, getFields());
+    return hashCode;
   }
 
   public List<Field> getFields() {
@@ -214,6 +223,7 @@ public int hashCode() {
     STRING, // String.
     DATETIME, // Date and time.
     BOOLEAN, // Boolean.
+    BYTES, // Byte array.
     ARRAY,
     MAP,
     ROW; // The field is itself a nested row.
@@ -320,6 +330,9 @@ public static FieldType of(TypeName typeName) {
     /** The type of byte fields. */
     public static final FieldType BYTE = FieldType.of(TypeName.BYTE);
 
+    /** The type of bytes fields. */
+    public static final FieldType BYTES = FieldType.of(TypeName.BYTES);
+
     /** The type of int16 fields. */
     public static final FieldType INT16 = FieldType.of(TypeName.INT16);
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
index 7a0ca817918..167e39ad213 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaProvider.java
@@ -31,24 +31,23 @@
  * contacts an external schema-registry service to determine the schema for a type.
  */
 @Experimental(Kind.SCHEMAS)
-public abstract class SchemaProvider {
+public interface SchemaProvider {
 
   /** Lookup a schema for the given type. If no schema exists, returns null. */
   @Nullable
-  public abstract <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor);
+  <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor);
 
   /**
    * Given a type, return a function that converts that type to a {@link Row} object If no schema
    * exists, returns null.
    */
   @Nullable
-  public abstract <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor);
+  <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor);
 
   /**
    * Given a type, returns a function that converts from a {@link Row} object to that type. If no
    * schema exists, returns null.
    */
   @Nullable
-  public abstract <T> SerializableFunction<Row, T> fromRowFunction(
-      TypeDescriptor<T> typeDescriptor);
+  <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor);
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
new file mode 100644
index 00000000000..1593fbb5fab
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.Duplication;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
+import net.bytebuddy.implementation.bytecode.TypeCreation;
+import net.bytebuddy.implementation.bytecode.assign.Assigner;
+import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.ClassUtils;
+import org.joda.time.DateTime;
+import org.joda.time.ReadableDateTime;
+import org.joda.time.ReadableInstant;
+
+class ByteBuddyUtils {
+  private static final ForLoadedType ARRAYS_TYPE = new ForLoadedType(Arrays.class);
+  private static final ForLoadedType ARRAY_UTILS_TYPE = new ForLoadedType(ArrayUtils.class);
+  private static final ForLoadedType BYTE_ARRAY_TYPE = new ForLoadedType(byte[].class);
+  private static final ForLoadedType BYTE_BUFFER_TYPE = new ForLoadedType(ByteBuffer.class);
+  private static final ForLoadedType CHAR_SEQUENCE_TYPE = new ForLoadedType(CharSequence.class);
+  private static final ForLoadedType DATE_TIME_TYPE = new ForLoadedType(DateTime.class);
+  private static final ForLoadedType LIST_TYPE = new ForLoadedType(List.class);
+  private static final ForLoadedType READABLE_INSTANT_TYPE =
+      new ForLoadedType(ReadableInstant.class);
+
+  // Create a new FieldValueGetter subclass.
+  @SuppressWarnings("unchecked")
+  static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
+      ByteBuddy byteBuddy, Type objectType, Type fieldType) {
+    TypeDescription.Generic getterGenericType =
+        TypeDescription.Generic.Builder.parameterizedType(
+                FieldValueGetter.class, objectType, fieldType)
+            .build();
+    return (DynamicType.Builder<FieldValueGetter>) byteBuddy.subclass(getterGenericType);
+  }
+
+  // Create a new FieldValueSetter subclass.
+  @SuppressWarnings("unchecked")
+  static DynamicType.Builder<FieldValueSetter> subclassSetterInterface(
+      ByteBuddy byteBuddy, Type objectType, Type fieldType) {
+    TypeDescription.Generic setterGenericType =
+        TypeDescription.Generic.Builder.parameterizedType(
+                FieldValueSetter.class, objectType, fieldType)
+            .build();
+    return (DynamicType.Builder<FieldValueSetter>) byteBuddy.subclass(setterGenericType);
+  }
+
+  // Base class used below to convert types.
+  @SuppressWarnings("unchecked")
+  abstract static class TypeConversion<T> {
+    public T convert(TypeDescriptor typeDescriptor) {
+      if (typeDescriptor.isArray()
+          && !typeDescriptor.getComponentType().getRawType().equals(byte.class)) {
+        // Byte arrays are special, so leave those alone.
+        return convertArray(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+        return convertCollection(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+        return convertMap(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
+        return convertDateTime(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
+        return convertByteBuffer(typeDescriptor);
+      } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
+        return convertCharSequence(typeDescriptor);
+      } else if (typeDescriptor.getRawType().isPrimitive()) {
+        return convertPrimitive(typeDescriptor);
+      } else {
+        return convertDefault(typeDescriptor);
+      }
+    }
+
+    protected abstract T convertArray(TypeDescriptor<?> type);
+
+    protected abstract T convertCollection(TypeDescriptor<?> type);
+
+    protected abstract T convertMap(TypeDescriptor<?> type);
+
+    protected abstract T convertDateTime(TypeDescriptor<?> type);
+
+    protected abstract T convertByteBuffer(TypeDescriptor<?> type);
+
+    protected abstract T convertCharSequence(TypeDescriptor<?> type);
+
+    protected abstract T convertPrimitive(TypeDescriptor<?> type);
+
+    protected abstract T convertDefault(TypeDescriptor<?> type);
+  }
+
+  /**
+   * Give a Java type, returns the Java type expected for use with Row. For example, both {@link
+   * StringBuffer} and {@link String} are represented as a {@link String} in Row. This determines
+   * what the return type of the getter will be. For instance, the following POJO class:
+   *
+   * <pre><code>
+   * class POJO {
+   *   StringBuffer str;
+   *   int[] array;
+   * }
+   * </code></pre>
+   *
+   * Generates the following getters:
+   *
+   * <pre><code>{@literal FieldValueGetter<POJO, String>}</code></pre>
+   *
+   * <pre><code>{@literal FieldValueGetter<POJO, List<Integer>>}</code></pre>
+   */
+  static class ConvertType extends TypeConversion<Type> {
+    @Override
+    protected Type convertArray(TypeDescriptor<?> type) {
+      return createListType(type).getType();
+    }
+
+    @Override
+    protected Type convertCollection(TypeDescriptor<?> type) {
+      return Collection.class;
+    }
+
+    @Override
+    protected Type convertMap(TypeDescriptor<?> type) {
+      return Map.class;
+    }
+
+    @Override
+    protected Type convertDateTime(TypeDescriptor<?> type) {
+      return ReadableInstant.class;
+    }
+
+    @Override
+    protected Type convertByteBuffer(TypeDescriptor<?> type) {
+      return byte[].class;
+    }
+
+    @Override
+    protected Type convertCharSequence(TypeDescriptor<?> type) {
+      return String.class;
+    }
+
+    @Override
+    protected Type convertPrimitive(TypeDescriptor<?> type) {
+      return ClassUtils.primitiveToWrapper(type.getRawType());
+    }
+
+    @Override
+    protected Type convertDefault(TypeDescriptor<?> type) {
+      return type.getType();
+    }
+
+    @SuppressWarnings("unchecked")
+    private <ElementT> TypeDescriptor<List<ElementT>> createListType(TypeDescriptor<?> type) {
+      TypeDescriptor componentType =
+          TypeDescriptor.of(ClassUtils.primitiveToWrapper(type.getComponentType().getRawType()));
+      return new TypeDescriptor<List<ElementT>>() {}.where(
+          new TypeParameter<ElementT>() {}, componentType);
+    }
+  }
+
+  /**
+   * Takes a {@link StackManipulation} that returns a value. Prepares this value to be returned by a
+   * getter. {@link org.apache.beam.sdk.values.Row} needs getters to return specific types, but we
+   * allow user objects to contain different but equivalent types. Therefore we must convert some of
+   * these types before returning. These conversions correspond to the ones defined in {@link
+   * ConvertType}. This class generates the code to do these conversion.
+   */
+  static class ConvertValueForGetter extends TypeConversion<StackManipulation> {
+    // The code that reads the value.
+    private final StackManipulation readValue;
+
+    ConvertValueForGetter(StackManipulation readValue) {
+      this.readValue = readValue;
+    }
+
+    @Override
+    protected StackManipulation convertArray(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return isComponentTypePrimitive ? Arrays.asList(ArrayUtils.toObject(value))
+      //     : Arrays.asList(value);
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      StackManipulation stackManipulation = readValue;
+      // Row always expects to get an Iterable back for array types. Wrap this array into a
+      // List using Arrays.asList before returning.
+      if (loadedType.getComponentType().isPrimitive()) {
+        // Arrays.asList doesn't take primitive arrays, so convert first using ArrayUtils.toObject.
+        stackManipulation =
+            new Compound(
+                stackManipulation,
+                MethodInvocation.invoke(
+                    ARRAY_UTILS_TYPE
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.isStatic()
+                                .and(ElementMatchers.named("toObject"))
+                                .and(ElementMatchers.takesArguments(loadedType)))
+                        .getOnly()));
+      }
+      return new Compound(
+          stackManipulation,
+          MethodInvocation.invoke(
+              ARRAYS_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.isStatic().and(ElementMatchers.named("asList")))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertCollection(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertMap(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
+      // If the class type is a ReadableDateTime, then return it.
+      if (ReadableDateTime.class.isAssignableFrom(type.getRawType())) {
+        return readValue;
+      }
+      // Otherwise, generate the following code:
+      //   return new DateTime(value.getMillis());
+
+      return new StackManipulation.Compound(
+          // Create a new instance of the target type.
+          TypeCreation.of(DATE_TIME_TYPE),
+          Duplication.SINGLE,
+          readValue,
+          TypeCasting.to(READABLE_INSTANT_TYPE),
+          // Call ReadableInstant.getMillis to extract the millis since the epoch.
+          MethodInvocation.invoke(
+              READABLE_INSTANT_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("getMillis"))
+                  .getOnly()),
+          // Construct a DateTime object containing the millis.
+          MethodInvocation.invoke(
+              DATE_TIME_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          .and(ElementMatchers.takesArguments(ForLoadedType.of(long.class))))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return value.array();
+
+      // We must extract the array from the ByteBuffer before returning.
+      // NOTE: we only support array-backed byte buffers in these POJOs. Others (e.g. mmaped
+      // files) are not supported.
+      return new Compound(
+          readValue,
+          MethodInvocation.invoke(
+              BYTE_BUFFER_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.named("array").and(ElementMatchers.returns(BYTE_ARRAY_TYPE)))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
+      // If the member is a String, then return it.
+      if (type.isSubtypeOf(TypeDescriptor.of(String.class))) {
+        return readValue;
+      }
+
+      // Otherwise, generate the following code:
+      // return value.toString();
+      return new Compound(
+          readValue,
+          MethodInvocation.invoke(
+              CHAR_SEQUENCE_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("toString"))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertPrimitive(TypeDescriptor<?> type) {
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      // Box the primitive type.
+      return new Compound(
+          readValue,
+          Assigner.DEFAULT.assign(
+              loadedType.asGenericType(), loadedType.asBoxed().asGenericType(), Typing.STATIC));
+    }
+
+    @Override
+    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
+      return readValue;
+    }
+  }
+
+  /**
+   * Row is going to call the setter with its internal Java type, however the user object being set
+   * might have a different type internally. For example, Row will be calling set with a {@link
+   * String} type (for string fields), but the user type might have a {@link StringBuffer} member
+   * there. This class generates code to convert between these types.
+   */
+  static class ConvertValueForSetter extends TypeConversion<StackManipulation> {
+    StackManipulation readValue;
+
+    ConvertValueForSetter(StackManipulation readValue) {
+      this.readValue = readValue;
+    }
+
+    @Override
+    protected StackManipulation convertArray(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // T[] toArray = (T[]) value.toArray(new T[0]);
+      // return isPrimitive ? toArray : ArrayUtils.toPrimitive(toArray);
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      // The type of the array containing the (possibly) boxed values.
+      TypeDescription arrayType =
+          TypeDescription.Generic.Builder.rawType(loadedType.getComponentType().asBoxed())
+              .asArray()
+              .build()
+              .asErasure();
+
+      // Extract an array from the collection.
+      StackManipulation stackManipulation =
+          new Compound(
+              readValue,
+              TypeCasting.to(LIST_TYPE),
+              // Call Collection.toArray(T[[]) to extract the array. Push new T[0] on the stack
+              // before
+              // calling toArray.
+              ArrayFactory.forType(loadedType.getComponentType().asBoxed().asGenericType())
+                  .withValues(Collections.emptyList()),
+              MethodInvocation.invoke(
+                  LIST_TYPE
+                      .getDeclaredMethods()
+                      .filter(
+                          ElementMatchers.named("toArray").and(ElementMatchers.takesArguments(1)))
+                      .getOnly()),
+              // Cast the result to T[].
+              TypeCasting.to(arrayType));
+
+      if (loadedType.getComponentType().isPrimitive()) {
+        // The array we extract will be an array of objects. If the pojo field is an array of
+        // primitive types, we need to then convert to an array of unboxed objects.
+        stackManipulation =
+            new StackManipulation.Compound(
+                stackManipulation,
+                MethodInvocation.invoke(
+                    ARRAY_UTILS_TYPE
+                        .getDeclaredMethods()
+                        .filter(
+                            ElementMatchers.named("toPrimitive")
+                                .and(ElementMatchers.takesArguments(arrayType)))
+                        .getOnly()));
+      }
+      return stackManipulation;
+    }
+
+    @Override
+    protected StackManipulation convertCollection(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertMap(TypeDescriptor<?> type) {
+      return readValue;
+    }
+
+    @Override
+    protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
+      // The setter might be called with a different subclass of ReadableInstant than the one stored
+      // in this POJO. We must extract the value passed into the setter and copy it into an instance
+      // that the POJO can accept.
+
+      // Generate the following code:
+      // return new T(value.getMillis());
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      return new Compound(
+          // Create a new instance of the target type.
+          TypeCreation.of(loadedType),
+          Duplication.SINGLE,
+          // Load the parameter and cast it to a ReadableInstant.
+          readValue,
+          TypeCasting.to(READABLE_INSTANT_TYPE),
+          // Call ReadableInstant.getMillis to extract the millis since the epoch.
+          MethodInvocation.invoke(
+              READABLE_INSTANT_TYPE
+                  .getDeclaredMethods()
+                  .filter(ElementMatchers.named("getMillis"))
+                  .getOnly()),
+          // All subclasses of ReadableInstant contain a ()(long) constructor that takes in a millis
+          // argument. Call that constructor of the field to initialize it.
+          MethodInvocation.invoke(
+              loadedType
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          .and(ElementMatchers.takesArguments(ForLoadedType.of(long.class))))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return ByteBuffer.wrap((byte[]) value);
+
+      // We currently assume that a byte[] setter will always accept a parameter of type byte[].
+      return new Compound(
+          readValue,
+          TypeCasting.to(BYTE_ARRAY_TYPE),
+          // Create a new ByteBuffer that wraps this byte[].
+          MethodInvocation.invoke(
+              BYTE_BUFFER_TYPE
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.named("wrap")
+                          .and(ElementMatchers.takesArguments(BYTE_ARRAY_TYPE)))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
+      // If the type is a String, just return it.
+      if (type.getRawType().isAssignableFrom(String.class)) {
+        return readValue;
+      }
+
+      // Otherwise, generate the following code:
+      // return new T((CharacterSequence) value).
+
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      return new StackManipulation.Compound(
+          TypeCreation.of(loadedType),
+          Duplication.SINGLE,
+          // Load the parameter and cast it to a CharSequence.
+          readValue,
+          TypeCasting.to(CHAR_SEQUENCE_TYPE),
+          // Create an element of the field type that wraps this one.
+          MethodInvocation.invoke(
+              loadedType
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          .and(ElementMatchers.takesArguments(CHAR_SEQUENCE_TYPE)))
+                  .getOnly()));
+    }
+
+    @Override
+    protected StackManipulation convertPrimitive(TypeDescriptor<?> type) {
+      ForLoadedType valueType = new ForLoadedType(type.getRawType());
+      // Unbox the type.
+      return new StackManipulation.Compound(
+          readValue,
+          Assigner.DEFAULT.assign(
+              valueType.asBoxed().asGenericType(),
+              valueType.asUnboxed().asGenericType(),
+              Typing.STATIC));
+    }
+
+    @Override
+    protected StackManipulation convertDefault(TypeDescriptor<?> type) {
+      return readValue;
+    }
+  }
+
+  // If the Field is a container type, returns the element type. Otherwise returns a null reference.
+  @SuppressWarnings("unchecked")
+  static Implementation getArrayComponentType(TypeDescriptor valueType) {
+    if (valueType.isArray()) {
+      Type component = valueType.getComponentType().getType();
+      if (!component.equals(byte.class)) {
+        return FixedValue.reference(component);
+      }
+    } else if (valueType.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+      TypeDescriptor<Collection<?>> collection = valueType.getSupertype(Collection.class);
+      if (collection.getType() instanceof ParameterizedType) {
+        ParameterizedType ptype = (ParameterizedType) collection.getType();
+        java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+        checkArgument(params.length == 1);
+        return FixedValue.reference(params[0]);
+      } else {
+        throw new RuntimeException("Collection parameter is not parameterized!");
+      }
+    }
+    return FixedValue.nullValue();
+  }
+
+  // If the Field is a map type, returns the key type, otherwise returns a null reference.
+  @Nullable
+  static Implementation getMapKeyType(TypeDescriptor valueType) {
+    return getMapType(valueType, 0);
+  }
+
+  // If the Field is a map type, returns the value type, otherwise returns a null reference.
+  @Nullable
+  static Implementation getMapValueType(TypeDescriptor valueType) {
+    return getMapType(valueType, 1);
+  }
+
+  // If the Field is a map type, returns the key or value type (0 is key type, 1 is value).
+  // Otherwise returns a null reference.
+  @SuppressWarnings("unchecked")
+  private static Implementation getMapType(TypeDescriptor valueType, int index) {
+    if (valueType.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+      TypeDescriptor<Collection<?>> map = valueType.getSupertype(Map.class);
+      if (map.getType() instanceof ParameterizedType) {
+        ParameterizedType ptype = (ParameterizedType) map.getType();
+        java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+        return FixedValue.reference(params[index]);
+      } else {
+        throw new RuntimeException("Map type is not parameterized! " + map);
+      }
+    }
+    return FixedValue.nullValue();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
new file mode 100644
index 00000000000..f3dc5706f0a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
+import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+
+/** A set of utilities to generate getter and setter classes for JavaBean objects. */
+@Experimental(Kind.SCHEMAS)
+public class JavaBeanUtils {
+  /** Create a {@link Schema} for a Java Bean class. */
+  public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
+    return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass);
+  }
+
+  private static List<TypeInformation> typeInformationFromClass(Class<?> clazz) {
+    try {
+      List<TypeInformation> getterTypes =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isGetter)
+              .map(m -> TypeInformation.forGetter(m))
+              .collect(Collectors.toList());
+
+      Map<String, TypeInformation> setterTypes =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isSetter)
+              .map(m -> TypeInformation.forSetter(m))
+              .collect(Collectors.toMap(TypeInformation::getName, Function.identity()));
+      validateJavaBean(getterTypes, setterTypes);
+      return getterTypes;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  // Make sure that there are matching setters and getters.
+  private static void validateJavaBean(
+      List<TypeInformation> getters, Map<String, TypeInformation> setters) {
+    for (TypeInformation type : getters) {
+      TypeInformation setterType = setters.get(type.getName());
+      if (setterType == null) {
+        throw new RuntimeException(
+            "JavaBean contained a getter for field "
+                + type.getName()
+                + "but did not contain a matching setter.");
+      }
+      if (!type.equals(setterType)) {
+        throw new RuntimeException(
+            "JavaBean contained mismatching setter for field" + type.getName());
+      }
+    }
+  }
+
+  // Static ByteBuddy instance used by all helpers.
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+
+  // The list of getters for a class is cached, so we only create the classes the first time
+  // getSetters is called.
+  private static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS =
+      Maps.newConcurrentMap();
+
+  /**
+   * Return the list of {@link FieldValueGetter}s for a Java Bean class
+   *
+   * <p>The returned list is ordered by the order of fields in the schema.
+   */
+  public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) {
+    return CACHED_GETTERS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          try {
+            Map<String, FieldValueGetter> getterMap =
+                ReflectUtils.getMethods(clazz)
+                    .stream()
+                    .filter(ReflectUtils::isGetter)
+                    .map(JavaBeanUtils::createGetter)
+                    .collect(Collectors.toMap(FieldValueGetter::name, Function.identity()));
+            return schema
+                .getFields()
+                .stream()
+                .map(f -> getterMap.get(f.getName()))
+                .collect(Collectors.toList());
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  private static <T> FieldValueGetter createGetter(Method getterMethod) {
+    TypeInformation typeInformation = TypeInformation.forGetter(getterMethod);
+    DynamicType.Builder<FieldValueGetter> builder =
+        ByteBuddyUtils.subclassGetterInterface(
+            BYTE_BUDDY,
+            getterMethod.getDeclaringClass(),
+            new ConvertType().convert(typeInformation.getType()));
+    builder = implementGetterMethods(builder, getterMethod);
+    try {
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException("Unable to generate a getter for getter '" + getterMethod + "'");
+    }
+  }
+
+  private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
+      DynamicType.Builder<FieldValueGetter> builder, Method method) {
+    TypeInformation typeInformation = TypeInformation.forGetter(method);
+    return builder
+        .method(ElementMatchers.named("name"))
+        .intercept(FixedValue.reference(typeInformation.getName()))
+        .method(ElementMatchers.named("type"))
+        .intercept(FixedValue.reference(typeInformation.getType().getRawType()))
+        .method(ElementMatchers.named("get"))
+        .intercept(new InvokeGetterInstruction(method));
+  }
+
+  // The list of setters for a class is cached, so we only create the classes the first time
+  // getSetters is called.
+  private static final Map<ClassWithSchema, List<FieldValueSetter>> CACHED_SETTERS =
+      Maps.newConcurrentMap();
+
+  /**
+   * Return the list of {@link FieldValueSetter}s for a Java Bean class
+   *
+   * <p>The returned list is ordered by the order of fields in the schema.
+   */
+  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) {
+    return CACHED_SETTERS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          try {
+            Map<String, FieldValueSetter> setterMap =
+                ReflectUtils.getMethods(clazz)
+                    .stream()
+                    .filter(ReflectUtils::isSetter)
+                    .map(JavaBeanUtils::createSetter)
+                    .collect(Collectors.toMap(FieldValueSetter::name, Function.identity()));
+            return schema
+                .getFields()
+                .stream()
+                .map(f -> setterMap.get(f.getName()))
+                .collect(Collectors.toList());
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  private static <T> FieldValueSetter createSetter(Method setterMethod) {
+    TypeInformation typeInformation = TypeInformation.forSetter(setterMethod);
+    DynamicType.Builder<FieldValueSetter> builder =
+        ByteBuddyUtils.subclassSetterInterface(
+            BYTE_BUDDY,
+            setterMethod.getDeclaringClass(),
+            new ConvertType().convert(typeInformation.getType()));
+    builder = implementSetterMethods(builder, setterMethod);
+    try {
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException("Unable to generate a setter for setter '" + setterMethod + "'");
+    }
+  }
+
+  private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
+      DynamicType.Builder<FieldValueSetter> builder, Method method) {
+    TypeInformation typeInformation = TypeInformation.forSetter(method);
+    return builder
+        .method(ElementMatchers.named("name"))
+        .intercept(FixedValue.reference(typeInformation.getName()))
+        .method(ElementMatchers.named("type"))
+        .intercept(FixedValue.reference(typeInformation.getType().getRawType()))
+        .method(ElementMatchers.named("elementType"))
+        .intercept(ByteBuddyUtils.getArrayComponentType(typeInformation.getType()))
+        .method(ElementMatchers.named("mapKeyType"))
+        .intercept(ByteBuddyUtils.getMapKeyType(typeInformation.getType()))
+        .method(ElementMatchers.named("mapValueType"))
+        .intercept(ByteBuddyUtils.getMapValueType(typeInformation.getType()))
+        .method(ElementMatchers.named("set"))
+        .intercept(new InvokeSetterInstruction(method));
+  }
+
+  // Implements a method to read a public getter out of an object.
+  private static class InvokeGetterInstruction implements Implementation {
+    // Getter method that wil be invoked
+    private Method method;
+
+    InvokeGetterInstruction(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        TypeInformation typeInformation = TypeInformation.forGetter(method);
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        // StackManipulation that will read the value from the class field.
+        StackManipulation readValue =
+            new StackManipulation.Compound(
+                // Method param is offset 1 (offset 0 is the this parameter).
+                MethodVariableAccess.REFERENCE.loadFrom(1),
+                // Invoke the getter
+                MethodInvocation.invoke(new ForLoadedMethod(method)));
+
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                new ConvertValueForGetter(readValue).convert(typeInformation.getType()),
+                MethodReturn.REFERENCE);
+
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+
+  // Implements a method to write a public set out on an object.
+  private static class InvokeSetterInstruction implements Implementation {
+    // Setter method that wil be invoked
+    private Method method;
+
+    InvokeSetterInstruction(Method method) {
+      this.method = method;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        TypeInformation typeInformation = TypeInformation.forSetter(method);
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        // The instruction to read the field.
+        StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(2);
+
+        // Read the object onto the stack.
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                // Object param is offset 1.
+                MethodVariableAccess.REFERENCE.loadFrom(1),
+                // Do any conversions necessary.
+                new ByteBuddyUtils.ConvertValueForSetter(readField)
+                    .convert(typeInformation.getType()),
+                // Now update the field and return void.
+                MethodInvocation.invoke(new ForLoadedMethod(method)),
+                MethodReturn.VOID);
+
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
new file mode 100644
index 00000000000..3c45e9d14d2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.implementation.FixedValue;
+import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
+import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.member.FieldAccess;
+import net.bytebuddy.implementation.bytecode.member.MethodReturn;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
+import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+
+/** A set of utilities yo generate getter and setter classes for POJOs. */
+@Experimental(Kind.SCHEMAS)
+public class POJOUtils {
+  public static Schema schemaFromPojoClass(Class<?> clazz) {
+    // We should cache the field order.
+    Function<Class, List<TypeInformation>> getTypesForClass =
+        c ->
+            ReflectUtils.getFields(c)
+                .stream()
+                .map(TypeInformation::forField)
+                .collect(Collectors.toList());
+    return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass);
+  }
+
+  // Static ByteBuddy instance used by all helpers.
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+
+  // The list of getters for a class is cached, so we only create the classes the first time
+  // getSetters is called.
+  public static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS =
+      Maps.newConcurrentMap();
+
+  public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) {
+    // Return the getters ordered by their position in the schema.
+    return CACHED_GETTERS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          Map<String, FieldValueGetter> getterMap =
+              ReflectUtils.getFields(clazz)
+                  .stream()
+                  .map(POJOUtils::createGetter)
+                  .collect(Collectors.toMap(FieldValueGetter::name, Function.identity()));
+          return schema
+              .getFields()
+              .stream()
+              .map(f -> getterMap.get(f.getName()))
+              .collect(Collectors.toList());
+        });
+  }
+
+  /**
+   * Generate the following {@link FieldValueSetter} class for the {@link Field}.
+   *
+   * <pre><code>
+   *   class Getter implements {@literal FieldValueGetter<POJO, FieldType>} {
+   *     {@literal @}Override public String name() { return field.getName(); }
+   *     {@literal @}Override public Class type() { return field.getType(); }
+   *      {@literal @}Override public FieldType get(POJO pojo) {
+   *        return convert(pojo.field);
+   *      }
+   *   }
+   * </code></pre>
+   */
+  @SuppressWarnings("unchecked")
+  static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter(Field field) {
+    DynamicType.Builder<FieldValueGetter> builder =
+        ByteBuddyUtils.subclassGetterInterface(
+            BYTE_BUDDY,
+            field.getDeclaringClass(),
+            new ConvertType().convert(TypeDescriptor.of(field.getType())));
+    builder = implementGetterMethods(builder, field);
+    try {
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException("Unable to generate a getter for field '" + field + "'.", e);
+    }
+  }
+
+  private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
+      DynamicType.Builder<FieldValueGetter> builder, Field field) {
+    return builder
+        .method(ElementMatchers.named("name"))
+        .intercept(FixedValue.reference(field.getName()))
+        .method(ElementMatchers.named("type"))
+        .intercept(FixedValue.reference(field.getType()))
+        .method(ElementMatchers.named("get"))
+        .intercept(new ReadFieldInstruction(field));
+  }
+
+  // The list of setters for a class is cached, so we only create the classes the first time
+  // getSetters is called.
+  private static final Map<ClassWithSchema, List<FieldValueSetter>> CACHED_SETTERS =
+      Maps.newConcurrentMap();
+
+  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) {
+    // Return the setters, ordered by their position in the schema.
+    return CACHED_SETTERS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          Map<String, FieldValueSetter> setterMap =
+              ReflectUtils.getFields(clazz)
+                  .stream()
+                  .map(POJOUtils::createSetter)
+                  .collect(Collectors.toMap(FieldValueSetter::name, Function.identity()));
+          return schema
+              .getFields()
+              .stream()
+              .map(f -> setterMap.get(f.getName()))
+              .collect(Collectors.toList());
+        });
+  }
+
+  /**
+   * Generate the following {@link FieldValueSetter} class for the {@link Field}.
+   *
+   * <pre><code>
+   *   class Setter implements {@literal FieldValueSetter<POJO, FieldType>} {
+   *     {@literal @}Override public String name() { return field.getName(); }
+   *     {@literal @}Override public Class type() { return field.getType(); }
+   *     {@literal @}Override public Type elementType() { return elementType; }
+   *     {@literal @}Override public Type mapKeyType() { return mapKeyType; }
+   *     {@literal @}Override public Type mapValueType() { return mapValueType; }
+   *     {@literal @}Override public void set(POJO pojo, FieldType value) {
+   *        pojo.field = convert(value);
+   *      }
+   *   }
+   * </code></pre>
+   */
+  @SuppressWarnings("unchecked")
+  private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> createSetter(Field field) {
+    DynamicType.Builder<FieldValueSetter> builder =
+        ByteBuddyUtils.subclassSetterInterface(
+            BYTE_BUDDY,
+            field.getDeclaringClass(),
+            new ConvertType().convert(TypeDescriptor.of(field.getType())));
+    builder = implementSetterMethods(builder, field);
+    try {
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException("Unable to generate a getter for field '" + field + "'.", e);
+    }
+  }
+
+  private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
+      DynamicType.Builder<FieldValueSetter> builder, Field field) {
+    return builder
+        .method(ElementMatchers.named("name"))
+        .intercept(FixedValue.reference(field.getName()))
+        .method(ElementMatchers.named("type"))
+        .intercept(FixedValue.reference(field.getType()))
+        .method(ElementMatchers.named("elementType"))
+        .intercept(ByteBuddyUtils.getArrayComponentType(TypeDescriptor.of(field.getGenericType())))
+        .method(ElementMatchers.named("mapKeyType"))
+        .intercept(ByteBuddyUtils.getMapKeyType(TypeDescriptor.of(field.getGenericType())))
+        .method(ElementMatchers.named("mapValueType"))
+        .intercept(ByteBuddyUtils.getMapValueType(TypeDescriptor.of(field.getGenericType())))
+        .method(ElementMatchers.named("set"))
+        .intercept(new SetFieldInstruction(field));
+  }
+
+  // Implements a method to read a public field out of an object.
+  static class ReadFieldInstruction implements Implementation {
+    // Field that will be read.
+    private Field field;
+
+    ReadFieldInstruction(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        // StackManipulation that will read the value from the class field.
+        StackManipulation readValue =
+            new StackManipulation.Compound(
+                // Method param is offset 1 (offset 0 is the this parameter).
+                MethodVariableAccess.REFERENCE.loadFrom(1),
+                // Read the field from the object.
+                FieldAccess.forField(new ForLoadedField(field)).read());
+
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                new ConvertValueForGetter(readValue).convert(TypeDescriptor.of(field.getType())),
+                MethodReturn.REFERENCE);
+
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+
+  // Implements a method to set a public field in an object.
+  static class SetFieldInstruction implements Implementation {
+    // Field that will be read.
+    private Field field;
+
+    SetFieldInstruction(Field field) {
+      this.field = field;
+    }
+
+    @Override
+    public InstrumentedType prepare(InstrumentedType instrumentedType) {
+      return instrumentedType;
+    }
+
+    @Override
+    public ByteCodeAppender appender(final Target implementationTarget) {
+      return (methodVisitor, implementationContext, instrumentedMethod) -> {
+        // this + method parameters.
+        int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+        // The instruction to read the field.
+        StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(2);
+
+        // Read the object onto the stack.
+        StackManipulation stackManipulation =
+            new StackManipulation.Compound(
+                // Object param is offset 1.
+                MethodVariableAccess.REFERENCE.loadFrom(1),
+                // Do any conversions necessary.
+                new ByteBuddyUtils.ConvertValueForSetter(readField)
+                    .convert(TypeDescriptor.of(field.getType())),
+                // Now update the field and return void.
+                FieldAccess.forField(new ForLoadedField(field)).write(),
+                MethodReturn.VOID);
+
+        StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
+        return new Size(size.getMaximalSize(), numLocals);
+      };
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
new file mode 100644
index 00000000000..054df6c0129
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A set of reflection helper methods. */
+public class ReflectUtils {
+  static class ClassWithSchema {
+    private final Class clazz;
+    private final Schema schema;
+
+    public ClassWithSchema(Class clazz, Schema schema) {
+      this.clazz = clazz;
+      this.schema = schema;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ClassWithSchema that = (ClassWithSchema) o;
+      return Objects.equals(clazz, that.clazz) && Objects.equals(schema, that.schema);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(clazz, schema);
+    }
+  }
+
+  private static final Map<Class, List<Method>> DECLARED_METHODS = Maps.newHashMap();
+  private static final Map<Class, List<Field>> DECLARED_FIELDS = Maps.newHashMap();
+
+  /** Returns the list of public, non-static methods in the class, caching the results. */
+  static List<Method> getMethods(Class clazz) throws IOException {
+    return DECLARED_METHODS.computeIfAbsent(
+        clazz,
+        c -> {
+          return Arrays.stream(c.getDeclaredMethods())
+              .filter(m -> Modifier.isPublic(m.getModifiers()))
+              .filter(m -> !Modifier.isStatic(m.getModifiers()))
+              .collect(Collectors.toList());
+        });
+  }
+
+  // Get all public, non-static, non-transient fields.
+  static List<Field> getFields(Class<?> clazz) {
+    return DECLARED_FIELDS.computeIfAbsent(
+        clazz,
+        c -> {
+          Map<String, Field> types = new LinkedHashMap<>();
+          do {
+            if (c.getPackage() != null && c.getPackage().getName().startsWith("java.")) {
+              break; // skip java built-in classes
+            }
+            for (java.lang.reflect.Field field : c.getDeclaredFields()) {
+              if ((field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) == 0) {
+                if ((field.getModifiers() & Modifier.PUBLIC) != 0) {
+                  boolean nullable = field.getAnnotation(Nullable.class) != null;
+                  checkArgument(
+                      types.put(field.getName(), field) == null,
+                      c.getSimpleName() + " contains two fields named: " + field);
+                }
+              }
+            }
+            c = c.getSuperclass();
+          } while (c != null);
+          return Lists.newArrayList(types.values());
+        });
+  }
+
+  static boolean isGetter(Method method) {
+    if (Void.TYPE.equals(method.getReturnType())) {
+      return false;
+    }
+    if (method.getName().startsWith("get") && method.getName().length() > 3) {
+      return true;
+    }
+    return (method.getName().startsWith("is")
+        && method.getName().length() > 2
+        && method.getParameterCount() == 0
+        && (Boolean.TYPE.equals(method.getReturnType())
+            || Boolean.class.equals(method.getReturnType())));
+  }
+
+  static boolean isSetter(Method method) {
+    return Void.TYPE.equals(method.getReturnType())
+        && method.getParameterCount() == 1
+        && method.getName().startsWith("set");
+  }
+
+  static String stripPrefix(String methodName, String prefix) {
+    String firstLetter = methodName.substring(prefix.length(), prefix.length() + 1).toLowerCase();
+
+    return (methodName.length() == prefix.length() + 1)
+        ? firstLetter
+        : (firstLetter + methodName.substring(prefix.length() + 1, methodName.length()));
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
new file mode 100644
index 00000000000..d0b315f0706
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.ReadableInstant;
+
+/** A set of utilities for inferring a Beam {@link Schema} from static Java types. */
+public class StaticSchemaInference {
+  enum MethodType {
+    GETTER,
+    SETTER
+  }
+
+  private static final Map<Class, FieldType> PRIMITIVE_TYPES =
+      ImmutableMap.<Class, FieldType>builder()
+          .put(Byte.class, FieldType.BYTE)
+          .put(byte.class, FieldType.BYTE)
+          .put(Short.class, FieldType.INT16)
+          .put(short.class, FieldType.INT16)
+          .put(Integer.class, FieldType.INT32)
+          .put(int.class, FieldType.INT32)
+          .put(Long.class, FieldType.INT64)
+          .put(long.class, FieldType.INT64)
+          .put(Float.class, FieldType.FLOAT)
+          .put(float.class, FieldType.FLOAT)
+          .put(Double.class, FieldType.DOUBLE)
+          .put(double.class, FieldType.DOUBLE)
+          .put(Boolean.class, FieldType.BOOLEAN)
+          .put(boolean.class, FieldType.BOOLEAN)
+          .put(BigDecimal.class, FieldType.DECIMAL)
+          .build();
+
+  /** Relevant information about a Java type. */
+  public static class TypeInformation {
+    private final String name;
+    private final TypeDescriptor type;
+    private final boolean nullable;
+
+    /** Construct a {@link TypeInformation}. */
+    private TypeInformation(String name, TypeDescriptor type, boolean nullable) {
+      this.name = name;
+      this.type = type;
+      this.nullable = nullable;
+    }
+
+    /** Construct a {@link TypeInformation} from a class member variable. */
+    public static TypeInformation forField(Field field) {
+      return new TypeInformation(
+          field.getName(),
+          TypeDescriptor.of(field.getGenericType()),
+          field.getAnnotation(Nullable.class) != null);
+    }
+
+    /** Construct a {@link TypeInformation} from a class getter. */
+    public static TypeInformation forGetter(Method method) {
+      String name;
+      if (method.getName().startsWith("get")) {
+        name = ReflectUtils.stripPrefix(method.getName(), "get");
+      } else if (method.getName().startsWith("is")) {
+        name = ReflectUtils.stripPrefix(method.getName(), "is");
+      } else {
+        throw new RuntimeException("Getter has wrong prefix " + method.getName());
+      }
+      TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
+      boolean nullable = method.getAnnotation(Nullable.class) != null;
+      return new TypeInformation(name, type, nullable);
+    }
+
+    /** Construct a {@link TypeInformation} from a class setter. */
+    public static TypeInformation forSetter(Method method) {
+      String name;
+      if (method.getName().startsWith("set")) {
+        name = ReflectUtils.stripPrefix(method.getName(), "set");
+      } else {
+        throw new RuntimeException("Setter has wrong prefix " + method.getName());
+      }
+      if (method.getParameterCount() != 1) {
+        throw new RuntimeException("Setter methods should take a single argument.");
+      }
+      TypeDescriptor type = TypeDescriptor.of(method.getGenericParameterTypes()[0]);
+      boolean nullable =
+          Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance);
+      return new TypeInformation(name, type, nullable);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public TypeDescriptor getType() {
+      return type;
+    }
+
+    public boolean isNullable() {
+      return nullable;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TypeInformation that = (TypeInformation) o;
+      return nullable == that.nullable
+          && Objects.equals(name, that.name)
+          && Objects.equals(type, that.type);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(name, type, nullable);
+    }
+  }
+
+  /**
+   * Infer a schema from a Java class.
+   *
+   * <p>Takes in a function to extract a list of field types from a class. Different callers may
+   * have different strategies for extracting this list: e.g. introspecting public member variables,
+   * public getter methods, or special annotations on the class.
+   */
+  public static Schema schemaFromClass(
+      Class<?> clazz, Function<Class, List<TypeInformation>> getTypesForClass) {
+    Schema.Builder builder = Schema.builder();
+    for (TypeInformation type : getTypesForClass.apply(clazz)) {
+      // TODO: look for nullable annotation.
+      builder.addField(type.getName(), fieldFromType(type.getType(), getTypesForClass));
+    }
+    return builder.build();
+  }
+
+  // Map a Java field type to a Beam Schema FieldType.
+  private static Schema.FieldType fieldFromType(
+      TypeDescriptor type, Function<Class, List<TypeInformation>> getTypesForClass) {
+    FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType());
+    if (primitiveType != null) {
+      return primitiveType;
+    }
+
+    if (type.isArray()) {
+      // If the type is T[] where T is byte, this is a BYTES type.
+      TypeDescriptor component = type.getComponentType();
+      if (component.getRawType().equals(byte.class)) {
+        return FieldType.BYTES;
+      } else {
+        // Otherwise this is an array type.
+        return FieldType.array(fieldFromType(component, getTypesForClass));
+      }
+    } else if (type.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+      TypeDescriptor<Collection<?>> collection = type.getSupertype(Collection.class);
+      if (collection.getType() instanceof ParameterizedType) {
+        ParameterizedType ptype = (ParameterizedType) collection.getType();
+        java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+        checkArgument(params.length == 1);
+        return FieldType.array(fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass));
+      } else {
+        throw new RuntimeException("Cannot infer schema from unparameterized collection.");
+      }
+    } else if (type.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+      TypeDescriptor<Collection<?>> map = type.getSupertype(Map.class);
+      if (map.getType() instanceof ParameterizedType) {
+        ParameterizedType ptype = (ParameterizedType) map.getType();
+        java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+        checkArgument(params.length == 2);
+        FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass);
+        FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), getTypesForClass);
+        checkArgument(
+            keyType.getTypeName().isPrimitiveType(), "Only primitive types can be map keys");
+        return FieldType.map(keyType, valueType);
+      } else {
+        throw new RuntimeException("Cannot infer schema from unparameterized map.");
+      }
+    } else if (type.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
+      return FieldType.STRING;
+    } else if (type.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
+      return FieldType.DATETIME;
+    } else if (type.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
+      return FieldType.BYTES;
+    } else {
+      return FieldType.row(schemaFromClass(type.getRawType(), getTypesForClass));
+    }
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/package-info.java
new file mode 100644
index 00000000000..3938e80ecf3
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities for deailing with schemas.
+ *
+ * <p>For further details, see the documentation for each class in this package.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.schemas.utils;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 48d6485f08b..d06525909e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -18,12 +18,14 @@
 package org.apache.beam.sdk.values;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
-import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,6 +39,7 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.reflect.FieldValueGetterFactory;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDateTime;
@@ -51,38 +54,29 @@
  * {see @link Schema#getRowCoder()}.
  */
 @Experimental
-@AutoValue
 public abstract class Row implements Serializable {
-  /** Creates a {@link Row} from the list of values and {@link #getSchema()}. */
-  public static <T> Collector<T, List<Object>, Row> toRow(Schema schema) {
-    return Collector.of(
-        () -> new ArrayList<>(schema.getFieldCount()),
-        List::add,
-        (left, right) -> {
-          left.addAll(right);
-          return left;
-        },
-        values -> Row.withSchema(schema).addValues(values).build());
-  }
+  private final Schema schema;
 
-  /** Creates a new record filled with nulls. */
-  public static Row nullRow(Schema schema) {
-    return Row.withSchema(schema)
-        .addValues(Collections.nCopies(schema.getFieldCount(), null))
-        .build();
+  Row(Schema schema) {
+    this.schema = schema;
   }
 
-  /** Get value by field name, {@link ClassCastException} is thrown if type doesn't match. */
-  @SuppressWarnings("TypeParameterUnusedInFormals")
-  public <T> T getValue(String fieldName) {
-    return getValue(getSchema().indexOf(fieldName));
-  }
+  // Abstract methods to be implemented by subclasses that handle object access.
 
   /** Get value by field index, {@link ClassCastException} is thrown if schema doesn't match. */
   @Nullable
   @SuppressWarnings("TypeParameterUnusedInFormals")
-  public <T> T getValue(int fieldIdx) {
-    return (T) getValues().get(fieldIdx);
+  public abstract <T> T getValue(int fieldIdx);
+
+  /** Return the size of data fields. */
+  public abstract int getFieldCount();
+  /** Return the list of data values. */
+  public abstract List<Object> getValues();
+
+  /** Get value by field name, {@link ClassCastException} is thrown if type doesn't match. */
+  @SuppressWarnings("TypeParameterUnusedInFormals")
+  public <T> T getValue(String fieldName) {
+    return getValue(getSchema().indexOf(fieldName));
   }
 
   /**
@@ -93,6 +87,14 @@ public byte getByte(String fieldName) {
     return getByte(getSchema().indexOf(fieldName));
   }
 
+  /**
+   * Get a {@link TypeName#BYTES} value by field name, {@link IllegalStateException} is thrown if
+   * schema doesn't match.
+   */
+  public byte[] getBytes(String fieldName) {
+    return getBytes(getSchema().indexOf(fieldName));
+  }
+
   /**
    * Get a {@link TypeName#INT16} value by field name, {@link IllegalStateException} is thrown if
    * schema doesn't match.
@@ -196,6 +198,14 @@ public Byte getByte(int idx) {
     return getValue(idx);
   }
 
+  /**
+   * Get a {@link TypeName#BYTES} value by field index, {@link ClassCastException} is thrown if
+   * schema doesn't match.
+   */
+  public byte[] getBytes(int idx) {
+    return getValue(idx);
+  }
+
   /**
    * Get a {@link TypeName#INT16} value by field index, {@link ClassCastException} is thrown if
    * schema doesn't match.
@@ -293,16 +303,10 @@ public Row getRow(int idx) {
     return getValue(idx);
   }
 
-  /** Return the size of data fields. */
-  public int getFieldCount() {
-    return getValues().size();
-  }
-
-  /** Return the list of data values. */
-  public abstract List<Object> getValues();
-
   /** Return {@link Schema} which describes the fields. */
-  public abstract Schema getSchema();
+  public Schema getSchema() {
+    return schema;
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -319,19 +323,26 @@ public int hashCode() {
     return Objects.hash(getSchema(), getValues());
   }
 
+  @Override
+  public String toString() {
+    return Arrays.deepToString(Iterables.toArray(getValues(), Object.class));
+  }
+
   /**
    * Creates a record builder with specified {@link #getSchema()}. {@link Builder#build()} will
    * throw an {@link IllegalArgumentException} if number of fields in {@link #getSchema()} does not
    * match the number of fields specified.
    */
   public static Builder withSchema(Schema schema) {
-    return new AutoValue_Row.Builder(schema);
+    return new Builder(schema);
   }
 
   /** Builder for {@link Row}. */
   public static class Builder {
-    private List<Object> values = new ArrayList<>();
+    private List<Object> values = Lists.newArrayList();
     private boolean attached = false;
+    @Nullable private FieldValueGetterFactory fieldValueGetterFactory;
+    @Nullable private Object getterTarget;
     private Schema schema;
 
     Builder(Schema schema) {
@@ -367,6 +378,13 @@ public Builder attachValues(List<Object> values) {
       return addValues(values);
     }
 
+    public Builder withFieldValueGetters(
+        FieldValueGetterFactory fieldValueGetterFactory, Object getterTarget) {
+      this.fieldValueGetterFactory = fieldValueGetterFactory;
+      this.getterTarget = getterTarget;
+      return this;
+    }
+
     private List<Object> verify(Schema schema, List<Object> values) {
       List<Object> verifiedValues = Lists.newArrayListWithCapacity(values.size());
       if (schema.getFieldCount() != values.size()) {
@@ -464,6 +482,13 @@ private Object verifyPrimitiveType(Object value, TypeName type, String fieldName
               return value;
             }
             break;
+          case BYTES:
+            if (value instanceof ByteBuffer) {
+              return ((ByteBuffer) value).array();
+            } else if (value instanceof byte[]) {
+              return (byte[]) value;
+            }
+            break;
           case INT16:
             if (value instanceof Short) {
               return value;
@@ -530,8 +555,38 @@ private Instant verifyDateTime(Object value, String fieldName) {
 
     public Row build() {
       checkNotNull(schema);
-      List<Object> values = attached ? this.values : verify(schema, this.values);
-      return new AutoValue_Row(values, schema);
+      if (!this.values.isEmpty() && fieldValueGetterFactory != null) {
+        throw new IllegalArgumentException(("Cannot specify both values and getters."));
+      }
+      if (!this.values.isEmpty()) {
+        List<Object> storageValues = attached ? this.values : verify(schema, this.values);
+        checkState(getterTarget == null, "withGetterTarget requires getters.");
+        return new RowWithStorage(schema, verify(schema, storageValues));
+      } else if (fieldValueGetterFactory != null) {
+        checkState(getterTarget != null, "getters require withGetterTarget.");
+        return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget);
+      } else {
+        return new RowWithStorage(schema, Collections.emptyList());
+      }
     }
   }
+
+  /** Creates a {@link Row} from the list of values and {@link #getSchema()}. */
+  public static <T> Collector<T, List<Object>, Row> toRow(Schema schema) {
+    return Collector.of(
+        () -> new ArrayList<>(schema.getFieldCount()),
+        List::add,
+        (left, right) -> {
+          left.addAll(right);
+          return left;
+        },
+        values -> Row.withSchema(schema).addValues(values).build());
+  }
+
+  /** Creates a new record filled with nulls. */
+  public static Row nullRow(Schema schema) {
+    return Row.withSchema(schema)
+        .addValues(Collections.nCopies(schema.getFieldCount(), null))
+        .build();
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
new file mode 100644
index 00000000000..f1d737037ac
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueGetterFactory;
+
+/**
+ * A Concrete subclass of {@link Row} that delegates to a set of provided {@link FieldValueGetter}s.
+ *
+ * <p>This allows us to have {@link Row} objects for which the actual storage is in another object.
+ * For example, the user's type may be a POJO, in which case the provided getters will simple read
+ * the appropriate fields from the POJO.
+ */
+public class RowWithGetters extends Row {
+  private final FieldValueGetterFactory fieldValueGetterFactory;
+  private final Object getterTarget;
+  private final List<FieldValueGetter> getters;
+
+  private final Map<Integer, List> cachedLists = Maps.newHashMap();
+  private final Map<Integer, Map> cachedMaps = Maps.newHashMap();
+
+  RowWithGetters(Schema schema, FieldValueGetterFactory getterFactory, Object getterTarget) {
+    super(schema);
+    this.fieldValueGetterFactory = getterFactory;
+    this.getterTarget = getterTarget;
+    this.getters = fieldValueGetterFactory.createGetters(getterTarget.getClass(), schema);
+  }
+
+  @Nullable
+  @Override
+  @SuppressWarnings({"TypeParameterUnusedInFormals", "unchecked"})
+  public <T> T getValue(int fieldIdx) {
+    FieldType type = getSchema().getField(fieldIdx).getType();
+    Object fieldValue = getters.get(fieldIdx).get(getterTarget);
+
+    return getValue(type, fieldValue, fieldIdx);
+  }
+
+  private List getListValue(FieldType elementType, Object fieldValue) {
+    Iterable iterable = (Iterable) fieldValue;
+    List<Object> list = Lists.newArrayList();
+    for (Object o : iterable) {
+      list.add(getValue(elementType, o, null));
+    }
+    return list;
+  }
+
+  private Map<?, ?> getMapValue(FieldType keyType, FieldType valueType, Map<?, ?> fieldValue) {
+    Map returnMap = Maps.newHashMap();
+    for (Map.Entry<?, ?> entry : fieldValue.entrySet()) {
+      returnMap.put(
+          getValue(keyType, entry.getKey(), null), getValue(valueType, entry.getValue(), null));
+    }
+    return returnMap;
+  }
+
+  @SuppressWarnings({"TypeParameterUnusedInFormals", "unchecked"})
+  private <T> T getValue(FieldType type, Object fieldValue, @Nullable Integer cacheKey) {
+    if (type.getTypeName().equals(TypeName.ROW)) {
+      return (T) new RowWithGetters(type.getRowSchema(), fieldValueGetterFactory, fieldValue);
+    } else if (type.getTypeName().equals(TypeName.ARRAY)) {
+      return cacheKey != null
+          ? (T)
+              cachedLists.computeIfAbsent(
+                  cacheKey, i -> getListValue(type.getCollectionElementType(), fieldValue))
+          : (T) getListValue(type.getCollectionElementType(), fieldValue);
+    } else if (type.getTypeName().equals(TypeName.MAP)) {
+      Map map = (Map) fieldValue;
+      return cacheKey != null
+          ? (T)
+              cachedMaps.computeIfAbsent(
+                  cacheKey, i -> getMapValue(type.getMapKeyType(), type.getMapValueType(), map))
+          : (T) getMapValue(type.getMapKeyType(), type.getMapValueType(), map);
+    } else {
+      return (T) fieldValue;
+    }
+  }
+
+  @Override
+  public int getFieldCount() {
+    return getters.size();
+  }
+
+  @Override
+  public List<Object> getValues() {
+    return getters.stream().map(g -> g.get(getterTarget)).collect(Collectors.toList());
+  }
+
+  public List<FieldValueGetter> getGetters() {
+    return getters;
+  }
+
+  public Object getGetterTarget() {
+    return getterTarget;
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithStorage.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithStorage.java
new file mode 100644
index 00000000000..1a096f5167a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithStorage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** Concrete subclass of {@link Row} that explicitly stores all fields of the row. */
+public class RowWithStorage extends Row {
+  private final List<Object> values;
+
+  RowWithStorage(Schema schema, List<Object> values) {
+    super(schema);
+    this.values = values;
+  }
+
+  @Override
+  @Nullable
+  @SuppressWarnings("TypeParameterUnusedInFormals")
+  public <T> T getValue(int fieldIdx) {
+    if (values.size() > fieldIdx) {
+      return (T) values.get(fieldIdx);
+    } else {
+      throw new IllegalArgumentException("No field at index " + fieldIdx);
+    }
+  }
+
+  @Override
+  public List<Object> getValues() {
+    return values;
+  }
+
+  @Override
+  public int getFieldCount() {
+    return values.size();
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ByteBuddyUtils.java
index 418cbda3af5..31a0fd2e65e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ByteBuddyUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/ByteBuddyUtils.java
@@ -21,6 +21,7 @@
 import static net.bytebuddy.matcher.ElementMatchers.named;
 
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.description.type.TypeDescription;
 import net.bytebuddy.dynamic.DynamicType;
@@ -36,10 +37,11 @@
    * FieldValueGetter}.
    */
   static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
-      ByteBuddy byteBuddy, Class clazz) {
+      ByteBuddy byteBuddy, Class clazz, Type fieldType) {
 
     TypeDescription.Generic getterGenericType =
-        TypeDescription.Generic.Builder.parameterizedType(FieldValueGetter.class, clazz).build();
+        TypeDescription.Generic.Builder.parameterizedType(FieldValueGetter.class, clazz, fieldType)
+            .build();
 
     return (DynamicType.Builder<FieldValueGetter>) byteBuddy.subclass(getterGenericType);
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
index 10473d8a738..e47978b4e59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetter.java
@@ -25,12 +25,11 @@
  *
  * <p>An interface to access a field of a class.
  *
- * <p>Implementations of this interface are generated at runtime by {@link RowFactory} to map pojo
- * fields to BeamRecord fields.
+ * <p>Implementations of this interface are generated at runtime to map object fields to Row fields.
  */
 @Internal
-public interface FieldValueGetter<T> {
-  Object get(T object);
+public interface FieldValueGetter<ObjectT, ValueT> {
+  ValueT get(ObjectT object);
 
   String name();
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
new file mode 100644
index 00000000000..1c8dbfd46e0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueGetterFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A factory interface for creating {@link FieldValueGetter} objects corresponding to a class. */
+public interface FieldValueGetterFactory {
+  /**
+   * Returns a list of {@link FieldValueGetter}s for the target class.
+   *
+   * <p>The returned list is ordered by the order of matching fields in the schema.
+   */
+  List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema);
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
new file mode 100644
index 00000000000..f55ae8ee576
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.lang.reflect.Type;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
+ *
+ * <p>An interface to set a field of a class.
+ *
+ * <p>Implementations of this interface are generated at runtime to map Row fields back into objet
+ * fields.
+ */
+@Internal
+public interface FieldValueSetter<ObjectT, ValueT> {
+  /** Sets the specified field on object to value. */
+  void set(ObjectT object, ValueT value);
+
+  /** Returns the name of the field. */
+  String name();
+
+  /** Returns the field type. */
+  Class type();
+
+  /** If the field is a container type, returns the element type. */
+  @Nullable
+  Type elementType();
+
+  /** If the field is a map type, returns the key type. */
+  @Nullable
+  Type mapKeyType();
+
+  /** If the field is a map type, returns the key type. */
+  @Nullable
+  Type mapValueType();
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
new file mode 100644
index 00000000000..d48138354d2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/FieldValueSetterFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A factory interface for creating {@link FieldValueSetter} objects corresponding to a class. */
+public interface FieldValueSetterFactory {
+  /**
+   * Returns a list of {@link FieldValueGetter}s for the target class.
+   *
+   * <p>The returned list is ordered by the order of matching fields in the schema.
+   */
+  List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema);
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/GeneratedGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/GeneratedGetterFactory.java
index 1060a49e034..1ccaaeacdb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/GeneratedGetterFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/GeneratedGetterFactory.java
@@ -95,7 +95,7 @@
   private static FieldValueGetter createFieldGetterInstance(Class clazz, Method getterMethod) {
 
     DynamicType.Builder<FieldValueGetter> getterBuilder =
-        subclassGetterInterface(BYTE_BUDDY, clazz);
+        subclassGetterInterface(BYTE_BUDDY, clazz, getterMethod.getGenericReturnType());
 
     getterBuilder = implementNameGetter(getterBuilder, tryStripGetPrefix(getterMethod));
     getterBuilder = implementTypeGetter(getterBuilder, getterMethod.getReturnType());
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanGetterFactory.java
new file mode 100644
index 00000000000..aa00eda0ac4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanGetterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+
+/** A factory for creating {@link FieldValueGetter} objects for a JavaBean object. */
+public class JavaBeanGetterFactory implements FieldValueGetterFactory {
+  @Override
+  public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema) {
+    return JavaBeanUtils.getGetters(targetClass, schema);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanSetterFactory.java
new file mode 100644
index 00000000000..fbde3f33f78
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/JavaBeanSetterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+
+/** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */
+public class JavaBeanSetterFactory implements FieldValueSetterFactory {
+  @Override
+  public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema) {
+    return JavaBeanUtils.getSetters(targetClass, schema);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueGetterFactory.java
new file mode 100644
index 00000000000..eae3762a122
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueGetterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+
+/** A factory for creating {@link FieldValueGetter} objects for a POJO. */
+public class PojoValueGetterFactory implements FieldValueGetterFactory {
+  @Override
+  public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema) {
+    return POJOUtils.getGetters(targetClass, schema);
+  }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueSetterFactory.java
new file mode 100644
index 00000000000..dd3fbdfd4ce
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/PojoValueSetterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values.reflect;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+
+/** A factory for creating {@link FieldValueSetter} objects for a POJO. */
+public class PojoValueSetterFactory implements FieldValueSetterFactory {
+  @Override
+  public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema) {
+    return POJOUtils.getSetters(targetClass, schema);
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
new file mode 100644
index 00000000000..7ed115a3c57
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas;
+
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAYS_BEAM_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_MAP_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.PRIMITIVE_ARRAY_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArraysBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedMapBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+/** Tests for the {@link JavaBeanSchema} schema provider. */
+public class JavaBeanSchemaTest {
+  static final DateTime DATE = DateTime.parse("1979-03-14");
+  static final byte[] BYTE_ARRAY = "bytearray".getBytes(Charset.defaultCharset());
+
+  private SimpleBean createSimple(String name) {
+    return new SimpleBean(
+        name,
+        (byte) 1,
+        (short) 2,
+        3,
+        4L,
+        true,
+        DATE,
+        DATE.toInstant(),
+        BYTE_ARRAY,
+        BigDecimal.ONE,
+        new StringBuilder(name).append("builder"));
+  }
+
+  private Row createSimpleRow(String name) {
+    return Row.withSchema(SIMPLE_BEAN_SCHEMA)
+        .addValues(
+            name,
+            (byte) 1,
+            (short) 2,
+            3,
+            4L,
+            true,
+            DATE,
+            DATE,
+            BYTE_ARRAY,
+            BYTE_ARRAY,
+            BigDecimal.ONE,
+            new StringBuilder(name).append("builder").toString())
+        .build();
+  }
+
+  @Test
+  public void testSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(SimpleBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimpleBean bean = createSimple("string");
+    Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
+
+    assertEquals(12, row.getFieldCount());
+    assertEquals("string", row.getString("str"));
+    assertEquals((byte) 1, row.getByte("aByte"));
+    assertEquals((short) 2, row.getInt16("aShort"));
+    assertEquals((int) 3, row.getInt32("anInt"));
+    assertEquals((long) 4, row.getInt64("aLong"));
+    assertEquals(true, row.getBoolean("aBoolean"));
+    assertEquals(DATE, row.getDateTime("dateTime"));
+    assertEquals(DATE, row.getDateTime("instant"));
+    assertArrayEquals(BYTE_ARRAY, row.getBytes("bytes"));
+    assertArrayEquals(BYTE_ARRAY, row.getBytes("byteBuffer"));
+    assertEquals(BigDecimal.ONE, row.getDecimal("bigDecimal"));
+    assertEquals("stringbuilder", row.getString("stringBuilder"));
+  }
+
+  @Test
+  public void testFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row = createSimpleRow("string");
+
+    SimpleBean bean = registry.getFromRowFunction(SimpleBean.class).apply(row);
+    assertEquals("string", bean.getStr());
+    assertEquals((byte) 1, bean.getaByte());
+    assertEquals((short) 2, bean.getaShort());
+    assertEquals((int) 3, bean.getAnInt());
+    assertEquals((long) 4, bean.getaLong());
+    assertEquals(true, bean.isaBoolean());
+    assertEquals(DATE, bean.getDateTime());
+    assertEquals(DATE.toInstant(), bean.getInstant());
+    assertArrayEquals("not equal", BYTE_ARRAY, bean.getBytes());
+    assertArrayEquals("not equal", BYTE_ARRAY, bean.getByteBuffer().array());
+    assertEquals(BigDecimal.ONE, bean.getBigDecimal());
+    assertEquals("stringbuilder", bean.getStringBuilder().toString());
+  }
+
+  @Test
+  public void testFromRowWithGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimpleBean bean = createSimple("string");
+    Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
+    // Test that the fromRowFunction simply returns the original object back.
+    SimpleBean extracted = registry.getFromRowFunction(SimpleBean.class).apply(row);
+    assertSame(bean, extracted);
+  }
+
+  @Test
+  public void testRecursiveGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_BEAN_SCHEMA, registry.getSchema(NestedBean.class));
+
+    NestedBean bean = new NestedBean(createSimple("string"));
+    Row row = registry.getToRowFunction(NestedBean.class).apply(bean);
+
+    Row nestedRow = row.getRow("nested");
+    assertEquals("string", nestedRow.getString("str"));
+    assertEquals((byte) 1, nestedRow.getByte("aByte"));
+    assertEquals((short) 2, nestedRow.getInt16("aShort"));
+    assertEquals((int) 3, nestedRow.getInt32("anInt"));
+    assertEquals((long) 4, nestedRow.getInt64("aLong"));
+    assertEquals(true, nestedRow.getBoolean("aBoolean"));
+    assertEquals(DATE, nestedRow.getDateTime("dateTime"));
+    assertEquals(DATE, nestedRow.getDateTime("instant"));
+    assertArrayEquals("not equal", BYTE_ARRAY, nestedRow.getBytes("bytes"));
+    assertArrayEquals("not equal", BYTE_ARRAY, nestedRow.getBytes("byteBuffer"));
+    assertEquals(BigDecimal.ONE, nestedRow.getDecimal("bigDecimal"));
+    assertEquals("stringbuilder", nestedRow.getString("stringBuilder"));
+  }
+
+  @Test
+  public void testRecursiveSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row nestedRow = createSimpleRow("string");
+
+    Row row = Row.withSchema(NESTED_BEAN_SCHEMA).addValue(nestedRow).build();
+    NestedBean bean = registry.getFromRowFunction(NestedBean.class).apply(row);
+    assertEquals("string", bean.getNested().getStr());
+    assertEquals((byte) 1, bean.getNested().getaByte());
+    assertEquals((short) 2, bean.getNested().getaShort());
+    assertEquals((int) 3, bean.getNested().getAnInt());
+    assertEquals((long) 4, bean.getNested().getaLong());
+    assertEquals(true, bean.getNested().isaBoolean());
+    assertEquals(DATE, bean.getNested().getDateTime());
+    assertEquals(DATE.toInstant(), bean.getNested().getInstant());
+    assertArrayEquals("not equal", BYTE_ARRAY, bean.getNested().getBytes());
+    assertArrayEquals("not equal", BYTE_ARRAY, bean.getNested().getByteBuffer().array());
+    assertEquals(BigDecimal.ONE, bean.getNested().getBigDecimal());
+    assertEquals("stringbuilder", bean.getNested().getStringBuilder().toString());
+  }
+
+  @Test
+  public void testPrimitiveArrayGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        PRIMITIVE_ARRAY_BEAN_SCHEMA, registry.getSchema(PrimitiveArrayBean.class));
+
+    List<String> strList = ImmutableList.of("a", "b", "c");
+    int[] intArray = {1, 2, 3, 4};
+    Long[] longArray = {42L, 43L, 44L};
+    PrimitiveArrayBean bean = new PrimitiveArrayBean(strList, intArray, longArray);
+    Row row = registry.getToRowFunction(PrimitiveArrayBean.class).apply(bean);
+    assertEquals(strList, row.getArray("strings"));
+    assertEquals(Ints.asList(intArray), row.getArray("integers"));
+    assertEquals(Arrays.asList(longArray), row.getArray("longs"));
+
+    // Ensure that list caching works.
+    assertSame(row.getArray("strings"), row.getArray("strings"));
+    assertSame(row.getArray("integers"), row.getArray("integers"));
+    assertSame(row.getArray("longs"), row.getArray("longs"));
+  }
+
+  @Test
+  public void testPrimitiveArraySetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        Row.withSchema(PRIMITIVE_ARRAY_BEAN_SCHEMA)
+            .addArray("a", "b", "c", "d")
+            .addArray(1, 2, 3, 4)
+            .addArray(42L, 43L, 44L, 45L)
+            .build();
+    PrimitiveArrayBean bean = registry.getFromRowFunction(PrimitiveArrayBean.class).apply(row);
+    assertEquals(row.getArray("strings"), bean.getStrings());
+    assertEquals(row.getArray("integers"), Ints.asList(bean.getIntegers()));
+    assertEquals(row.getArray("longs"), Arrays.asList(bean.getLongs()));
+  }
+
+  @Test
+  public void testRecursiveArrayGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_ARRAY_BEAN_SCHEMA, registry.getSchema(NestedArrayBean.class));
+
+    SimpleBean simple1 = createSimple("string1");
+    SimpleBean simple2 = createSimple("string2");
+    SimpleBean simple3 = createSimple("string3");
+
+    NestedArrayBean bean = new NestedArrayBean(simple1, simple2, simple3);
+    Row row = registry.getToRowFunction(NestedArrayBean.class).apply(bean);
+    List<Row> rows = row.getArray("beans");
+    assertSame(simple1, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(0)));
+    assertSame(simple2, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(1)));
+    assertSame(simple3, registry.getFromRowFunction(NestedArrayBean.class).apply(rows.get(2)));
+  }
+
+  @Test
+  public void testRecursiveArraySetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row row1 = createSimpleRow("string1");
+    Row row2 = createSimpleRow("string2");
+    Row row3 = createSimpleRow("string3");
+    ;
+
+    Row row = Row.withSchema(NESTED_ARRAY_BEAN_SCHEMA).addArray(row1, row2, row3).build();
+    NestedArrayBean bean = registry.getFromRowFunction(NestedArrayBean.class).apply(row);
+    assertEquals(3, bean.getBeans().length);
+    assertEquals("string1", bean.getBeans()[0].getStr());
+    assertEquals("string2", bean.getBeans()[1].getStr());
+    assertEquals("string3", bean.getBeans()[2].getStr());
+  }
+
+  @Test
+  public void testNestedArraysGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_ARRAYS_BEAM_SCHEMA, registry.getSchema(NestedArraysBean.class));
+
+    List<List<String>> listOfLists =
+        Lists.newArrayList(
+            Lists.newArrayList("a", "b", "c"),
+            Lists.newArrayList("d", "e", "f"),
+            Lists.newArrayList("g", "h", "i"));
+    NestedArraysBean bean = new NestedArraysBean(listOfLists);
+    Row row = registry.getToRowFunction(NestedArraysBean.class).apply(bean);
+    assertEquals(listOfLists, row.getArray("lists"));
+  }
+
+  @Test
+  public void testNestedArraysSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    List<List<String>> listOfLists =
+        Lists.newArrayList(
+            Lists.newArrayList("a", "b", "c"),
+            Lists.newArrayList("d", "e", "f"),
+            Lists.newArrayList("g", "h", "i"));
+    Row row = Row.withSchema(NESTED_ARRAYS_BEAM_SCHEMA).addArray(listOfLists).build();
+    NestedArraysBean bean = registry.getFromRowFunction(NestedArraysBean.class).apply(row);
+    assertEquals(listOfLists, bean.getLists());
+  }
+
+  @Test
+  public void testMapFieldGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_MAP_BEAN_SCHEMA, registry.getSchema(NestedMapBean.class));
+
+    SimpleBean simple1 = createSimple("string1");
+    SimpleBean simple2 = createSimple("string2");
+    SimpleBean simple3 = createSimple("string3");
+
+    NestedMapBean bean =
+        new NestedMapBean(
+            ImmutableMap.of(
+                "simple1", simple1,
+                "simple2", simple2,
+                "simple3", simple3));
+    Row row = registry.getToRowFunction(NestedMapBean.class).apply(bean);
+    Map<String, Row> extractedMap = row.getMap("map");
+    assertEquals(3, extractedMap.size());
+    assertEquals("string1", extractedMap.get("simple1").getString("str"));
+    assertEquals("string2", extractedMap.get("simple2").getString("str"));
+    assertEquals("string3", extractedMap.get("simple3").getString("str"));
+  }
+
+  @Test
+  public void testMapFieldSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row row1 = createSimpleRow("string1");
+    Row row2 = createSimpleRow("string2");
+    Row row3 = createSimpleRow("string3");
+    Row row =
+        Row.withSchema(NESTED_MAP_BEAN_SCHEMA)
+            .addValue(
+                ImmutableMap.of(
+                    "simple1", row1,
+                    "simple2", row2,
+                    "simple3", row3))
+            .build();
+    NestedMapBean bean = registry.getFromRowFunction(NestedMapBean.class).apply(row);
+    assertEquals(3, bean.getMap().size());
+    assertEquals("string1", bean.getMap().get("simple1").getStr());
+    assertEquals("string2", bean.getMap().get("simple2").getStr());
+    assertEquals("string3", bean.getMap().get("simple3").getStr());
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
new file mode 100644
index 00000000000..314914ac088
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas;
+
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAYS_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.PRIMITIVE_ARRAY_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveArrayPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for the {@link JavaFieldSchema} schema provider. */
+public class JavaFieldSchemaTest {
+  static final DateTime DATE = DateTime.parse("1979-03-14");
+  static final Instant INSTANT = DateTime.parse("1979-03-15").toInstant();
+  static final byte[] BYTE_ARRAY = "bytearray".getBytes(Charset.defaultCharset());
+  static final ByteBuffer BYTE_BUFFER =
+      ByteBuffer.wrap("byteBuffer".getBytes(Charset.defaultCharset()));
+
+  private SimplePOJO createSimple(String name) {
+    return new SimplePOJO(
+        name,
+        (byte) 1,
+        (short) 2,
+        3,
+        4L,
+        true,
+        DATE,
+        INSTANT,
+        BYTE_ARRAY,
+        BYTE_BUFFER,
+        BigDecimal.ONE,
+        new StringBuilder(name).append("builder"));
+  }
+
+  private Row createSimpleRow(String name) {
+    return Row.withSchema(SIMPLE_POJO_SCHEMA)
+        .addValues(
+            name,
+            (byte) 1,
+            (short) 2,
+            3,
+            4L,
+            true,
+            DATE,
+            INSTANT,
+            BYTE_ARRAY,
+            BYTE_BUFFER.array(),
+            BigDecimal.ONE,
+            new StringBuilder(name).append("builder").toString())
+        .build();
+  }
+
+  @Test
+  public void testSchema() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Schema schema = registry.getSchema(SimplePOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testToRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimplePOJO pojo = createSimple("string");
+    Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
+
+    assertEquals(12, row.getFieldCount());
+    assertEquals("string", row.getString("str"));
+    assertEquals((byte) 1, row.getByte("aByte"));
+    assertEquals((short) 2, row.getInt16("aShort"));
+    assertEquals((int) 3, row.getInt32("anInt"));
+    assertEquals((long) 4, row.getInt64("aLong"));
+    assertEquals(true, row.getBoolean("aBoolean"));
+    assertEquals(DATE, row.getDateTime("dateTime"));
+    assertEquals(INSTANT, row.getDateTime("instant").toInstant());
+    assertArrayEquals(BYTE_ARRAY, row.getBytes("bytes"));
+    assertArrayEquals(BYTE_BUFFER.array(), row.getBytes("byteBuffer"));
+    assertEquals(BigDecimal.ONE, row.getDecimal("bigDecimal"));
+    assertEquals("stringbuilder", row.getString("stringBuilder"));
+  }
+
+  @Test
+  public void testFromRow() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row = createSimpleRow("string");
+
+    SimplePOJO pojo = registry.getFromRowFunction(SimplePOJO.class).apply(row);
+    assertEquals("string", pojo.str);
+    assertEquals((byte) 1, pojo.aByte);
+    assertEquals((short) 2, pojo.aShort);
+    assertEquals((int) 3, pojo.anInt);
+    assertEquals((long) 4, pojo.aLong);
+    assertEquals(true, pojo.aBoolean);
+    assertEquals(DATE, pojo.dateTime);
+    assertEquals(INSTANT, pojo.instant);
+    assertArrayEquals("not equal", BYTE_ARRAY, pojo.bytes);
+    assertEquals(BYTE_BUFFER, pojo.byteBuffer);
+    assertEquals(BigDecimal.ONE, pojo.bigDecimal);
+    assertEquals("stringbuilder", pojo.stringBuilder.toString());
+  }
+
+  @Test
+  public void testFromRowWithGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SimplePOJO pojo = createSimple("string");
+    Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
+    // Test that the fromRowFunction simply returns the original object back.
+    SimplePOJO extracted = registry.getFromRowFunction(SimplePOJO.class).apply(row);
+    assertSame(pojo, extracted);
+  }
+
+  @Test
+  public void testRecursiveGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_POJO_SCHEMA, registry.getSchema(NestedPOJO.class));
+
+    NestedPOJO pojo = new NestedPOJO(createSimple("string"));
+    Row row = registry.getToRowFunction(NestedPOJO.class).apply(pojo);
+
+    Row nestedRow = row.getRow("nested");
+    assertEquals("string", nestedRow.getString("str"));
+    assertEquals((byte) 1, nestedRow.getByte("aByte"));
+    assertEquals((short) 2, nestedRow.getInt16("aShort"));
+    assertEquals((int) 3, nestedRow.getInt32("anInt"));
+    assertEquals((long) 4, nestedRow.getInt64("aLong"));
+    assertEquals(true, nestedRow.getBoolean("aBoolean"));
+    assertEquals(DATE, nestedRow.getDateTime("dateTime"));
+    assertEquals(INSTANT, nestedRow.getDateTime("instant").toInstant());
+    assertArrayEquals("not equal", BYTE_ARRAY, nestedRow.getBytes("bytes"));
+    assertArrayEquals("not equal", BYTE_BUFFER.array(), nestedRow.getBytes("byteBuffer"));
+    assertEquals(BigDecimal.ONE, nestedRow.getDecimal("bigDecimal"));
+    assertEquals("stringbuilder", nestedRow.getString("stringBuilder"));
+  }
+
+  @Test
+  public void testRecursiveSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row nestedRow = createSimpleRow("string");
+
+    Row row = Row.withSchema(NESTED_POJO_SCHEMA).addValue(nestedRow).build();
+    NestedPOJO pojo = registry.getFromRowFunction(NestedPOJO.class).apply(row);
+    assertEquals("string", pojo.nested.str);
+    assertEquals((byte) 1, pojo.nested.aByte);
+    assertEquals((short) 2, pojo.nested.aShort);
+    assertEquals((int) 3, pojo.nested.anInt);
+    assertEquals((long) 4, pojo.nested.aLong);
+    assertEquals(true, pojo.nested.aBoolean);
+    assertEquals(DATE, pojo.nested.dateTime);
+    assertEquals(INSTANT, pojo.nested.instant);
+    assertArrayEquals("not equal", BYTE_ARRAY, pojo.nested.bytes);
+    assertEquals(BYTE_BUFFER, pojo.nested.byteBuffer);
+    assertEquals(BigDecimal.ONE, pojo.nested.bigDecimal);
+    assertEquals("stringbuilder", pojo.nested.stringBuilder.toString());
+  }
+
+  @Test
+  public void testPrimitiveArrayGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        PRIMITIVE_ARRAY_POJO_SCHEMA, registry.getSchema(PrimitiveArrayPOJO.class));
+
+    List<String> strList = ImmutableList.of("a", "b", "c");
+    int[] intArray = {1, 2, 3, 4};
+    Long[] longArray = {42L, 43L, 44L};
+    PrimitiveArrayPOJO pojo = new PrimitiveArrayPOJO(strList, intArray, longArray);
+    Row row = registry.getToRowFunction(PrimitiveArrayPOJO.class).apply(pojo);
+    assertEquals(strList, row.getArray("strings"));
+    assertEquals(Ints.asList(intArray), row.getArray("integers"));
+    assertEquals(Arrays.asList(longArray), row.getArray("longs"));
+
+    // Ensure that list caching works.
+    assertSame(row.getArray("strings"), row.getArray("strings"));
+    assertSame(row.getArray("integers"), row.getArray("integers"));
+    assertSame(row.getArray("longs"), row.getArray("longs"));
+  }
+
+  @Test
+  public void testPrimitiveArraySetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    Row row =
+        Row.withSchema(PRIMITIVE_ARRAY_POJO_SCHEMA)
+            .addArray("a", "b", "c", "d")
+            .addArray(1, 2, 3, 4)
+            .addArray(42L, 43L, 44L, 45L)
+            .build();
+    PrimitiveArrayPOJO pojo = registry.getFromRowFunction(PrimitiveArrayPOJO.class).apply(row);
+    assertEquals(row.getArray("strings"), pojo.strings);
+    assertEquals(row.getArray("integers"), Ints.asList(pojo.integers));
+    assertEquals(row.getArray("longs"), Arrays.asList(pojo.longs));
+  }
+
+  @Test
+  public void testRecursiveArrayGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_ARRAY_POJO_SCHEMA, registry.getSchema(NestedArrayPOJO.class));
+
+    SimplePOJO simple1 = createSimple("string1");
+    SimplePOJO simple2 = createSimple("string2");
+    SimplePOJO simple3 = createSimple("string3");
+
+    NestedArrayPOJO pojo = new NestedArrayPOJO(simple1, simple2, simple3);
+    Row row = registry.getToRowFunction(NestedArrayPOJO.class).apply(pojo);
+    List<Row> rows = row.getArray("pojos");
+    assertSame(simple1, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(0)));
+    assertSame(simple2, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(1)));
+    assertSame(simple3, registry.getFromRowFunction(NestedArrayPOJO.class).apply(rows.get(2)));
+  }
+
+  @Test
+  public void testRecursiveArraySetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row row1 = createSimpleRow("string1");
+    Row row2 = createSimpleRow("string2");
+    Row row3 = createSimpleRow("string3");
+    ;
+
+    Row row = Row.withSchema(NESTED_ARRAY_POJO_SCHEMA).addArray(row1, row2, row3).build();
+    NestedArrayPOJO pojo = registry.getFromRowFunction(NestedArrayPOJO.class).apply(row);
+    assertEquals(3, pojo.pojos.length);
+    assertEquals("string1", pojo.pojos[0].str);
+    assertEquals("string2", pojo.pojos[1].str);
+    assertEquals("string3", pojo.pojos[2].str);
+  }
+
+  @Test
+  public void testNestedArraysGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_ARRAYS_POJO_SCHEMA, registry.getSchema(NestedArraysPOJO.class));
+
+    List<List<String>> listOfLists =
+        Lists.newArrayList(
+            Lists.newArrayList("a", "b", "c"),
+            Lists.newArrayList("d", "e", "f"),
+            Lists.newArrayList("g", "h", "i"));
+    NestedArraysPOJO pojo = new NestedArraysPOJO(listOfLists);
+    Row row = registry.getToRowFunction(NestedArraysPOJO.class).apply(pojo);
+    assertEquals(listOfLists, row.getArray("lists"));
+  }
+
+  @Test
+  public void testNestedArraysSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    List<List<String>> listOfLists =
+        Lists.newArrayList(
+            Lists.newArrayList("a", "b", "c"),
+            Lists.newArrayList("d", "e", "f"),
+            Lists.newArrayList("g", "h", "i"));
+    Row row = Row.withSchema(NESTED_ARRAYS_POJO_SCHEMA).addArray(listOfLists).build();
+    NestedArraysPOJO pojo = registry.getFromRowFunction(NestedArraysPOJO.class).apply(row);
+    assertEquals(listOfLists, pojo.lists);
+  }
+
+  @Test
+  public void testMapFieldGetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+    SchemaTestUtils.assertSchemaEquivalent(
+        NESTED_MAP_POJO_SCHEMA, registry.getSchema(NestedMapPOJO.class));
+
+    SimplePOJO simple1 = createSimple("string1");
+    SimplePOJO simple2 = createSimple("string2");
+    SimplePOJO simple3 = createSimple("string3");
+
+    NestedMapPOJO pojo =
+        new NestedMapPOJO(
+            ImmutableMap.of(
+                "simple1", simple1,
+                "simple2", simple2,
+                "simple3", simple3));
+    Row row = registry.getToRowFunction(NestedMapPOJO.class).apply(pojo);
+    Map<String, Row> extractedMap = row.getMap("map");
+    assertEquals(3, extractedMap.size());
+    assertEquals("string1", extractedMap.get("simple1").getString("str"));
+    assertEquals("string2", extractedMap.get("simple2").getString("str"));
+    assertEquals("string3", extractedMap.get("simple3").getString("str"));
+  }
+
+  @Test
+  public void testMapFieldSetters() throws NoSuchSchemaException {
+    SchemaRegistry registry = SchemaRegistry.createDefault();
+
+    Row row1 = createSimpleRow("string1");
+    Row row2 = createSimpleRow("string2");
+    Row row3 = createSimpleRow("string3");
+    Row row =
+        Row.withSchema(NESTED_MAP_POJO_SCHEMA)
+            .addValue(
+                ImmutableMap.of(
+                    "simple1", row1,
+                    "simple2", row2,
+                    "simple3", row3))
+            .build();
+    NestedMapPOJO pojo = registry.getFromRowFunction(NestedMapPOJO.class).apply(row);
+    assertEquals(3, pojo.map.size());
+    assertEquals("string1", pojo.map.get("simple1").str);
+    assertEquals("string2", pojo.map.get("simple2").str);
+    assertEquals("string3", pojo.map.get("simple3").str);
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
index aea9d1d117f..dbfe21a6e04 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java
@@ -93,7 +93,7 @@ public void testRegisterForType() throws NoSuchSchemaException {
     tryGetters(registry);
   }
 
-  static final class Provider extends SchemaProvider {
+  static final class Provider implements SchemaProvider {
     @Override
     public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptors.strings())) {
@@ -125,7 +125,7 @@ public void testRegisterProvider() throws NoSuchSchemaException {
 
   static class TestSchemaClass {}
 
-  static final class TestAutoProvider extends SchemaProvider {
+  static final class TestAutoProvider implements SchemaProvider {
     @Override
     public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptor.of(TestSchemaClass.class))) {
@@ -143,6 +143,7 @@ public void testRegisterProvider() throws NoSuchSchemaException {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptor.of(TestSchemaClass.class))) {
         return r -> (T) new TestSchemaClass();
@@ -169,7 +170,7 @@ public void testAutoSchemaProvider() throws NoSuchSchemaException {
   @DefaultSchema(TestDefaultSchemaProvider.class)
   static class TestDefaultSchemaClass {}
 
-  static final class TestDefaultSchemaProvider extends SchemaProvider {
+  static final class TestDefaultSchemaProvider implements SchemaProvider {
     @Override
     public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
@@ -187,6 +188,7 @@ public void testAutoSchemaProvider() throws NoSuchSchemaException {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) {
       if (typeDescriptor.equals(TypeDescriptor.of(TestDefaultSchemaClass.class))) {
         return r -> (T) new TestSchemaClass();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
new file mode 100644
index 00000000000..72c423822f1
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BOXED_FIELDS_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.BEAN_WITH_BYTE_ARRAY_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_ARRAY_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_COLLECTION_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.NESTED_MAP_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.PRIMITIVE_ARRAY_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.PRIMITIVE_MAP_BEAN_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedArrayBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedCollectionBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NestedMapBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+/** Tests for the {@link JavaBeanUtils} class. */
+public class JavaBeanUtilsTest {
+
+  @Test
+  public void testSimpleBean() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedBean() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testPrimitiveArray() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedArray() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedCollection() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testPrimitiveMap() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedMap() {
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
+  }
+
+  @Test
+  public void testGeneratedSimpleGetters() {
+    SimpleBean simpleBean = new SimpleBean();
+    simpleBean.setStr("field1");
+    simpleBean.setaByte((byte) 41);
+    simpleBean.setaShort((short) 42);
+    simpleBean.setAnInt(43);
+    simpleBean.setaLong(44);
+    simpleBean.setaBoolean(true);
+    simpleBean.setDateTime(DateTime.parse("1979-03-14"));
+    simpleBean.setInstant(DateTime.parse("1979-03-15").toInstant());
+    simpleBean.setBytes("bytes1".getBytes(Charset.defaultCharset()));
+    simpleBean.setByteBuffer(ByteBuffer.wrap("bytes2".getBytes(Charset.defaultCharset())));
+    simpleBean.setBigDecimal(new BigDecimal(42));
+    simpleBean.setStringBuilder(new StringBuilder("stringBuilder"));
+
+    List<FieldValueGetter> getters = JavaBeanUtils.getGetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA);
+    assertEquals(12, getters.size());
+    assertEquals("str", getters.get(0).name());
+
+    assertEquals("field1", getters.get(0).get(simpleBean));
+    assertEquals((byte) 41, getters.get(1).get(simpleBean));
+    assertEquals((short) 42, getters.get(2).get(simpleBean));
+    assertEquals((int) 43, getters.get(3).get(simpleBean));
+    assertEquals((long) 44, getters.get(4).get(simpleBean));
+    assertEquals(true, getters.get(5).get(simpleBean));
+    assertEquals(DateTime.parse("1979-03-14"), getters.get(6).get(simpleBean));
+    assertEquals(DateTime.parse("1979-03-15"), getters.get(7).get(simpleBean));
+    assertArrayEquals(
+        "Unexpected bytes",
+        "bytes1".getBytes(Charset.defaultCharset()),
+        (byte[]) getters.get(8).get(simpleBean));
+    assertArrayEquals(
+        "Unexpected bytes",
+        "bytes2".getBytes(Charset.defaultCharset()),
+        (byte[]) getters.get(9).get(simpleBean));
+    assertEquals(new BigDecimal(42), getters.get(10).get(simpleBean));
+    assertEquals("stringBuilder", getters.get(11).get(simpleBean).toString());
+  }
+
+  @Test
+  public void testGeneratedSimpleSetters() {
+    SimpleBean simpleBean = new SimpleBean();
+    List<FieldValueSetter> setters = JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA);
+    assertEquals(12, setters.size());
+
+    setters.get(0).set(simpleBean, "field1");
+    setters.get(1).set(simpleBean, (byte) 41);
+    setters.get(2).set(simpleBean, (short) 42);
+    setters.get(3).set(simpleBean, (int) 43);
+    setters.get(4).set(simpleBean, (long) 44);
+    setters.get(5).set(simpleBean, true);
+    setters.get(6).set(simpleBean, DateTime.parse("1979-03-14"));
+    setters.get(7).set(simpleBean, DateTime.parse("1979-03-15"));
+    setters.get(8).set(simpleBean, "bytes1".getBytes(Charset.defaultCharset()));
+    setters.get(9).set(simpleBean, "bytes2".getBytes(Charset.defaultCharset()));
+    setters.get(10).set(simpleBean, new BigDecimal(42));
+    setters.get(11).set(simpleBean, "stringBuilder");
+
+    assertEquals("field1", simpleBean.getStr());
+    assertEquals((byte) 41, simpleBean.getaByte());
+    assertEquals((short) 42, simpleBean.getaShort());
+    assertEquals((int) 43, simpleBean.getAnInt());
+    assertEquals((long) 44, simpleBean.getaLong());
+    assertEquals(true, simpleBean.isaBoolean());
+    assertEquals(DateTime.parse("1979-03-14"), simpleBean.getDateTime());
+    assertEquals(DateTime.parse("1979-03-15").toInstant(), simpleBean.getInstant());
+    assertArrayEquals(
+        "Unexpected bytes", "bytes1".getBytes(Charset.defaultCharset()), simpleBean.getBytes());
+    assertEquals(
+        ByteBuffer.wrap("bytes2".getBytes(Charset.defaultCharset())), simpleBean.getByteBuffer());
+    assertEquals(new BigDecimal(42), simpleBean.getBigDecimal());
+    assertEquals("stringBuilder", simpleBean.getStringBuilder().toString());
+  }
+
+  @Test
+  public void testGeneratedSimpleBoxedGetters() {
+    BeanWithBoxedFields bean = new BeanWithBoxedFields();
+    bean.setaByte((byte) 41);
+    bean.setaShort((short) 42);
+    bean.setAnInt(43);
+    bean.setaLong(44L);
+    bean.setaBoolean(true);
+
+    List<FieldValueGetter> getters =
+        JavaBeanUtils.getGetters(BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA);
+    assertEquals((byte) 41, getters.get(0).get(bean));
+    assertEquals((short) 42, getters.get(1).get(bean));
+    assertEquals((int) 43, getters.get(2).get(bean));
+    assertEquals((long) 44, getters.get(3).get(bean));
+    assertEquals(true, getters.get(4).get(bean));
+  }
+
+  @Test
+  public void testGeneratedSimpleBoxedSetters() {
+    BeanWithBoxedFields bean = new BeanWithBoxedFields();
+    List<FieldValueSetter> setters =
+        JavaBeanUtils.getSetters(BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA);
+
+    setters.get(0).set(bean, (byte) 41);
+    setters.get(1).set(bean, (short) 42);
+    setters.get(2).set(bean, (int) 43);
+    setters.get(3).set(bean, (long) 44);
+    setters.get(4).set(bean, true);
+
+    assertEquals((byte) 41, bean.getaByte().byteValue());
+    assertEquals((short) 42, bean.getaShort().shortValue());
+    assertEquals((int) 43, bean.getAnInt().intValue());
+    assertEquals((long) 44, bean.getaLong().longValue());
+    assertEquals(true, bean.getaBoolean().booleanValue());
+  }
+
+  @Test
+  public void testGeneratedByteBufferSetters() {
+    BeanWithByteArray bean = new BeanWithByteArray();
+    List<FieldValueSetter> setters =
+        JavaBeanUtils.getSetters(BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA);
+    setters.get(0).set(bean, "field1".getBytes(Charset.defaultCharset()));
+    setters.get(1).set(bean, "field2".getBytes(Charset.defaultCharset()));
+
+    assertArrayEquals("not equal", "field1".getBytes(Charset.defaultCharset()), bean.getBytes1());
+    assertEquals(ByteBuffer.wrap("field2".getBytes(Charset.defaultCharset())), bean.getBytes2());
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
new file mode 100644
index 00000000000..fb80d9c61e9
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_ARRAY_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_COLLECTION_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_MAP_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.NESTED_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_BOXED_FIELDS_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_BYTE_ARRAY_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.PRIMITIVE_ARRAY_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.PRIMITIVE_MAP_POJO_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedMapPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithBoxedFields;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.POJOWithByteArray;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveArrayPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.PrimitiveMapPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO;
+import org.apache.beam.sdk.values.reflect.FieldValueGetter;
+import org.apache.beam.sdk.values.reflect.FieldValueSetter;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for the {@link POJOUtils} class. */
+public class POJOUtilsTest {
+  static final DateTime DATE = DateTime.parse("1979-03-14");
+  static final Instant INSTANT = DateTime.parse("1979-03-15").toInstant();
+  static final byte[] BYTE_ARRAY = "byteArray".getBytes(Charset.defaultCharset());
+  static final ByteBuffer BYTE_BUFFER =
+      ByteBuffer.wrap("byteBuffer".getBytes(Charset.defaultCharset()));
+
+  @Test
+  public void testSimplePOJO() {
+    Schema schema = POJOUtils.schemaFromPojoClass(SimplePOJO.class);
+    assertEquals(SIMPLE_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedPOJO() {
+    Schema schema = POJOUtils.schemaFromPojoClass(NestedPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testPrimitiveArray() {
+    Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveArrayPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedArray() {
+    Schema schema = POJOUtils.schemaFromPojoClass(NestedArrayPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedCollection() {
+    Schema schema = POJOUtils.schemaFromPojoClass(NestedCollectionPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testPrimitiveMap() {
+    Schema schema = POJOUtils.schemaFromPojoClass(PrimitiveMapPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testNestedMap() {
+    Schema schema = POJOUtils.schemaFromPojoClass(NestedMapPOJO.class);
+    SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_POJO_SCHEMA, schema);
+  }
+
+  @Test
+  public void testGeneratedSimpleGetters() {
+    SimplePOJO simplePojo =
+        new SimplePOJO(
+            "field1",
+            (byte) 41,
+            (short) 42,
+            43,
+            44L,
+            true,
+            DATE,
+            INSTANT,
+            BYTE_ARRAY,
+            BYTE_BUFFER,
+            new BigDecimal(42),
+            new StringBuilder("stringBuilder"));
+
+    List<FieldValueGetter> getters = POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA);
+    assertEquals(12, getters.size());
+    assertEquals("str", getters.get(0).name());
+    assertEquals("field1", getters.get(0).get(simplePojo));
+    assertEquals((byte) 41, getters.get(1).get(simplePojo));
+    assertEquals((short) 42, getters.get(2).get(simplePojo));
+    assertEquals((int) 43, getters.get(3).get(simplePojo));
+    assertEquals((long) 44, getters.get(4).get(simplePojo));
+    assertEquals(true, getters.get(5).get(simplePojo));
+    assertEquals(DATE, getters.get(6).get(simplePojo));
+    assertEquals(INSTANT, ((DateTime) getters.get(7).get(simplePojo)).toInstant());
+    assertArrayEquals("Unexpected bytes", BYTE_ARRAY, (byte[]) getters.get(8).get(simplePojo));
+    assertArrayEquals(
+        "Unexpected bytes", BYTE_BUFFER.array(), (byte[]) getters.get(9).get(simplePojo));
+    assertEquals(new BigDecimal(42), getters.get(10).get(simplePojo));
+    assertEquals("stringBuilder", getters.get(11).get(simplePojo));
+  }
+
+  @Test
+  public void testGeneratedSimpleSetters() {
+    SimplePOJO simplePojo = new SimplePOJO();
+    List<FieldValueSetter> setters = POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA);
+    assertEquals(12, setters.size());
+
+    setters.get(0).set(simplePojo, "field1");
+    setters.get(1).set(simplePojo, (byte) 41);
+    setters.get(2).set(simplePojo, (short) 42);
+    setters.get(3).set(simplePojo, (int) 43);
+    setters.get(4).set(simplePojo, (long) 44);
+    setters.get(5).set(simplePojo, true);
+    setters.get(6).set(simplePojo, DATE);
+    setters.get(7).set(simplePojo, INSTANT);
+    setters.get(8).set(simplePojo, BYTE_ARRAY);
+    setters.get(9).set(simplePojo, BYTE_BUFFER.array());
+    setters.get(10).set(simplePojo, new BigDecimal(42));
+    setters.get(11).set(simplePojo, "stringBuilder");
+
+    assertEquals("field1", simplePojo.str);
+    assertEquals((byte) 41, simplePojo.aByte);
+    assertEquals((short) 42, simplePojo.aShort);
+    assertEquals((int) 43, simplePojo.anInt);
+    assertEquals((long) 44, simplePojo.aLong);
+    assertEquals(true, simplePojo.aBoolean);
+    assertEquals(DATE, simplePojo.dateTime);
+    assertEquals(INSTANT, simplePojo.instant);
+    assertArrayEquals("Unexpected bytes", BYTE_ARRAY, simplePojo.bytes);
+    assertEquals(BYTE_BUFFER, simplePojo.byteBuffer);
+    assertEquals(new BigDecimal(42), simplePojo.bigDecimal);
+    assertEquals("stringBuilder", simplePojo.stringBuilder.toString());
+  }
+
+  @Test
+  public void testGeneratedSimpleBoxedGetters() {
+    POJOWithBoxedFields pojo = new POJOWithBoxedFields((byte) 41, (short) 42, 43, 44L, true);
+
+    List<FieldValueGetter> getters =
+        POJOUtils.getGetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA);
+    assertEquals((byte) 41, getters.get(0).get(pojo));
+    assertEquals((short) 42, getters.get(1).get(pojo));
+    assertEquals((int) 43, getters.get(2).get(pojo));
+    assertEquals((long) 44, getters.get(3).get(pojo));
+    assertEquals(true, getters.get(4).get(pojo));
+  }
+
+  @Test
+  public void testGeneratedSimpleBoxedSetters() {
+    POJOWithBoxedFields pojo = new POJOWithBoxedFields();
+    List<FieldValueSetter> setters =
+        POJOUtils.getSetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA);
+
+    setters.get(0).set(pojo, (byte) 41);
+    setters.get(1).set(pojo, (short) 42);
+    setters.get(2).set(pojo, (int) 43);
+    setters.get(3).set(pojo, (long) 44);
+    setters.get(4).set(pojo, true);
+
+    assertEquals((byte) 41, pojo.aByte.byteValue());
+    assertEquals((short) 42, pojo.aShort.shortValue());
+    assertEquals((int) 43, pojo.anInt.intValue());
+    assertEquals((long) 44, pojo.aLong.longValue());
+    assertEquals(true, pojo.aBoolean.booleanValue());
+  }
+
+  @Test
+  public void testGeneratedByteBufferSetters() {
+    POJOWithByteArray pojo = new POJOWithByteArray();
+    List<FieldValueSetter> setters =
+        POJOUtils.getSetters(POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA);
+    setters.get(0).set(pojo, BYTE_ARRAY);
+    setters.get(1).set(pojo, BYTE_BUFFER.array());
+
+    assertArrayEquals("not equal", BYTE_ARRAY, pojo.bytes1);
+    assertEquals(BYTE_BUFFER, pojo.bytes2);
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
new file mode 100644
index 00000000000..e0a20dc5095
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+
+/** Utilities for testing schemas. */
+public class SchemaTestUtils {
+  // Assert that two schemas are equivalent, ignoring field order. This tests that both schemas
+  // (recursively) contain the same fields with the same names, but possibly different orders.
+  public static void assertSchemaEquivalent(Schema expected, Schema actual) {
+    List<Field> expectedFields =
+        expected
+            .getFields()
+            .stream()
+            .sorted(Comparator.comparing(Field::getName))
+            .collect(Collectors.toList());
+    List<Field> actualFields =
+        actual
+            .getFields()
+            .stream()
+            .sorted(Comparator.comparing(Field::getName))
+            .collect(Collectors.toList());
+    assertEquals(expectedFields.size(), actualFields.size());
+
+    for (int i = 0; i < expectedFields.size(); ++i) {
+      Field expectedField = expectedFields.get(i);
+      Field actualField = actualFields.get(i);
+      assertFieldEquivalent(expectedField, actualField);
+    }
+  }
+
+  public static void assertFieldEquivalent(Field expectedField, Field actualField) {
+    assertEquals(expectedField.getName(), actualField.getName());
+    assertEquals(expectedField.getNullable(), actualField.getNullable());
+    assertFieldTypeEquivalent(expectedField.getType(), actualField.getType());
+  }
+
+  public static void assertFieldTypeEquivalent(
+      FieldType expectedFieldType, FieldType actualFieldType) {
+    assertEquals(expectedFieldType.getTypeName(), actualFieldType.getTypeName());
+    if (TypeName.ROW.equals(expectedFieldType.getTypeName())) {
+      assertSchemaEquivalent(expectedFieldType.getRowSchema(), actualFieldType.getRowSchema());
+    } else if (TypeName.ARRAY.equals(expectedFieldType.getTypeName())) {
+      assertFieldTypeEquivalent(
+          expectedFieldType.getCollectionElementType(), actualFieldType.getCollectionElementType());
+    } else if (TypeName.MAP.equals(expectedFieldType.getTypeName())) {
+      assertFieldTypeEquivalent(expectedFieldType.getMapKeyType(), actualFieldType.getMapKeyType());
+      assertFieldTypeEquivalent(
+          expectedFieldType.getMapValueType(), actualFieldType.getMapValueType());
+    } else {
+      assertEquals(expectedFieldType, actualFieldType);
+    }
+  }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
new file mode 100644
index 00000000000..92c1cb3d1fc
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Various Java Beans and associated schemas used in tests. */
+public class TestJavaBeans {
+  /** A simple Bean containing basic types. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class SimpleBean {
+    private String str;
+    private byte aByte;
+    private short aShort;
+    private int anInt;
+    private long aLong;
+    private boolean aBoolean;
+    private DateTime dateTime;
+    private Instant instant;
+    private byte[] bytes;
+    private ByteBuffer byteBuffer;
+    private BigDecimal bigDecimal;
+    private StringBuilder stringBuilder;
+
+    public SimpleBean() {}
+
+    public SimpleBean(
+        String str,
+        byte aByte,
+        short aShort,
+        int anInt,
+        long aLong,
+        boolean aBoolean,
+        DateTime dateTime,
+        Instant instant,
+        byte[] bytes,
+        BigDecimal bigDecimal,
+        StringBuilder stringBuilder) {
+      this.str = str;
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+      this.dateTime = dateTime;
+      this.instant = instant;
+      this.bytes = bytes;
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.bigDecimal = bigDecimal;
+      this.stringBuilder = stringBuilder;
+    }
+
+    public String getStr() {
+      return str;
+    }
+
+    public void setStr(String str) {
+      this.str = str;
+    }
+
+    public byte getaByte() {
+      return aByte;
+    }
+
+    public void setaByte(byte aByte) {
+      this.aByte = aByte;
+    }
+
+    public short getaShort() {
+      return aShort;
+    }
+
+    public void setaShort(short aShort) {
+      this.aShort = aShort;
+    }
+
+    public int getAnInt() {
+      return anInt;
+    }
+
+    public void setAnInt(int anInt) {
+      this.anInt = anInt;
+    }
+
+    public long getaLong() {
+      return aLong;
+    }
+
+    public void setaLong(long aLong) {
+      this.aLong = aLong;
+    }
+
+    public boolean isaBoolean() {
+      return aBoolean;
+    }
+
+    public void setaBoolean(boolean aBoolean) {
+      this.aBoolean = aBoolean;
+    }
+
+    public DateTime getDateTime() {
+      return dateTime;
+    }
+
+    public void setDateTime(DateTime dateTime) {
+      this.dateTime = dateTime;
+    }
+
+    public byte[] getBytes() {
+      return bytes;
+    }
+
+    public void setBytes(byte[] bytes) {
+      this.bytes = bytes;
+    }
+
+    public ByteBuffer getByteBuffer() {
+      return byteBuffer;
+    }
+
+    public void setByteBuffer(ByteBuffer byteBuffer) {
+      this.byteBuffer = byteBuffer;
+    }
+
+    public Instant getInstant() {
+      return instant;
+    }
+
+    public void setInstant(Instant instant) {
+      this.instant = instant;
+    }
+
+    public BigDecimal getBigDecimal() {
+      return bigDecimal;
+    }
+
+    public void setBigDecimal(BigDecimal bigDecimal) {
+      this.bigDecimal = bigDecimal;
+    }
+
+    public StringBuilder getStringBuilder() {
+      return stringBuilder;
+    }
+
+    public void setStringBuilder(StringBuilder stringBuilder) {
+      this.stringBuilder = stringBuilder;
+    }
+  }
+
+  /** The schema for {@link SimpleBean}. * */
+  public static final Schema SIMPLE_BEAN_SCHEMA =
+      Schema.builder()
+          .addStringField("str")
+          .addByteField("aByte")
+          .addInt16Field("aShort")
+          .addInt32Field("anInt")
+          .addInt64Field("aLong")
+          .addBooleanField("aBoolean")
+          .addDateTimeField("dateTime")
+          .addDateTimeField("instant")
+          .addByteArrayField("bytes")
+          .addByteArrayField("byteBuffer")
+          .addDecimalField("bigDecimal")
+          .addStringField("stringBuilder")
+          .build();
+
+  /** A Bean containing a nested class. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class NestedBean {
+    private SimpleBean nested;
+
+    public NestedBean(SimpleBean nested) {
+      this.nested = nested;
+    }
+
+    public NestedBean() {}
+
+    public SimpleBean getNested() {
+      return nested;
+    }
+
+    public void setNested(SimpleBean nested) {
+      this.nested = nested;
+    }
+  }
+
+  /** The schema for {@link NestedBean}. * */
+  public static final Schema NESTED_BEAN_SCHEMA =
+      Schema.builder().addRowField("nested", SIMPLE_BEAN_SCHEMA).build();
+
+  /** A Bean containing arrays of primitive types. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class PrimitiveArrayBean {
+    // Test every type of array parameter supported.
+    private List<String> strings;
+    private int[] integers;
+    private Long[] longs;
+
+    public PrimitiveArrayBean() {}
+
+    public PrimitiveArrayBean(List<String> strings, int[] integers, Long[] longs) {
+      this.strings = strings;
+      this.integers = integers;
+      this.longs = longs;
+    }
+
+    public List<String> getStrings() {
+      return strings;
+    }
+
+    public void setStrings(List<String> strings) {
+      this.strings = strings;
+    }
+
+    public int[] getIntegers() {
+      return integers;
+    }
+
+    public void setIntegers(int[] integers) {
+      this.integers = integers;
+    }
+
+    public Long[] getLongs() {
+      return longs;
+    }
+
+    public void setLongs(Long[] longs) {
+      this.longs = longs;
+    }
+  }
+
+  /** The schema for {@link PrimitiveArrayBean}. * */
+  public static final Schema PRIMITIVE_ARRAY_BEAN_SCHEMA =
+      Schema.builder()
+          .addArrayField("strings", FieldType.STRING)
+          .addArrayField("integers", FieldType.INT32)
+          .addArrayField("longs", FieldType.INT64)
+          .build();
+
+  /** A Bean containing arrays of complex classes. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class NestedArrayBean {
+    private SimpleBean[] beans;
+
+    public NestedArrayBean(SimpleBean... beans) {
+      this.beans = beans;
+    }
+
+    public NestedArrayBean() {}
+
+    public SimpleBean[] getBeans() {
+      return beans;
+    }
+
+    public void setBeans(SimpleBean[] beans) {
+      this.beans = beans;
+    }
+  }
+
+  /** The schema for {@link NestedArrayBean}. * */
+  public static final Schema NESTED_ARRAY_BEAN_SCHEMA =
+      Schema.builder().addArrayField("beans", FieldType.row(SIMPLE_BEAN_SCHEMA)).build();
+
+  /** A bean containing arrays of arrays. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class NestedArraysBean {
+    private List<List<String>> lists;
+
+    public NestedArraysBean(List<List<String>> lists) {
+      this.lists = lists;
+    }
+
+    public NestedArraysBean() {}
+
+    public List<List<String>> getLists() {
+      return lists;
+    }
+
+    public void setLists(List<List<String>> lists) {
+      this.lists = lists;
+    }
+  }
+
+  /** The schema for {@link NestedArrayBean}. * */
+  public static final Schema NESTED_ARRAYS_BEAM_SCHEMA =
+      Schema.builder().addArrayField("lists", FieldType.array(FieldType.STRING)).build();
+
+  /** A Bean containing a {@link List} of a complex type. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class NestedCollectionBean {
+    private List<SimpleBean> simples;
+
+    public NestedCollectionBean(List<SimpleBean> simples) {
+      this.simples = simples;
+    }
+
+    public NestedCollectionBean() {}
+
+    public List<SimpleBean> getSimples() {
+      return simples;
+    }
+
+    public void setSimples(List<SimpleBean> simples) {
+      this.simples = simples;
+    }
+  }
+
+  /** The schema for {@link NestedCollectionBean}. * */
+  public static final Schema NESTED_COLLECTION_BEAN_SCHEMA =
+      Schema.builder().addArrayField("simples", FieldType.row(SIMPLE_BEAN_SCHEMA)).build();
+
+  /** A Bean containing a simple {@link Map}. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class PrimitiveMapBean {
+    private Map<String, Integer> map;
+
+    public PrimitiveMapBean(Map<String, Integer> map) {
+      this.map = map;
+    }
+
+    public PrimitiveMapBean() {}
+
+    public Map<String, Integer> getMap() {
+      return map;
+    }
+
+    public void setMap(Map<String, Integer> map) {
+      this.map = map;
+    }
+  }
+
+  /** The schema for {@link PrimitiveMapBean}. * */
+  public static final Schema PRIMITIVE_MAP_BEAN_SCHEMA =
+      Schema.builder().addMapField("map", FieldType.STRING, FieldType.INT32).build();
+
+  /** A Bean containing a map of a complex type. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class NestedMapBean {
+    private Map<String, SimpleBean> map;
+
+    public NestedMapBean(Map<String, SimpleBean> map) {
+      this.map = map;
+    }
+
+    public NestedMapBean() {}
+
+    public Map<String, SimpleBean> getMap() {
+      return map;
+    }
+
+    public void setMap(Map<String, SimpleBean> map) {
+      this.map = map;
+    }
+  }
+
+  /** The schema for {@link NestedMapBean}. * */
+  public static final Schema NESTED_MAP_BEAN_SCHEMA =
+      Schema.builder()
+          .addMapField("map", FieldType.STRING, FieldType.row(SIMPLE_BEAN_SCHEMA))
+          .build();
+
+  /** A Bean containing the boxed version of primitive types. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class BeanWithBoxedFields {
+    private Byte aByte;
+    private Short aShort;
+    private Integer anInt;
+    private Long aLong;
+    private Boolean aBoolean;
+
+    public BeanWithBoxedFields(
+        Byte aByte, Short aShort, Integer anInt, Long aLong, Boolean aBoolean) {
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+    }
+
+    public BeanWithBoxedFields() {}
+
+    public Byte getaByte() {
+      return aByte;
+    }
+
+    public void setaByte(Byte aByte) {
+      this.aByte = aByte;
+    }
+
+    public Short getaShort() {
+      return aShort;
+    }
+
+    public void setaShort(Short aShort) {
+      this.aShort = aShort;
+    }
+
+    public Integer getAnInt() {
+      return anInt;
+    }
+
+    public void setAnInt(Integer anInt) {
+      this.anInt = anInt;
+    }
+
+    public Long getaLong() {
+      return aLong;
+    }
+
+    public void setaLong(Long aLong) {
+      this.aLong = aLong;
+    }
+
+    public Boolean getaBoolean() {
+      return aBoolean;
+    }
+
+    public void setaBoolean(Boolean aBoolean) {
+      this.aBoolean = aBoolean;
+    }
+  }
+
+  /** The schema for {@link BeanWithBoxedFields}. * */
+  public static final Schema BEAN_WITH_BOXED_FIELDS_SCHEMA =
+      Schema.builder()
+          .addByteField("aByte")
+          .addInt16Field("aShort")
+          .addInt32Field("anInt")
+          .addInt64Field("aLong")
+          .addBooleanField("aBoolean")
+          .build();
+
+  /** A Bean containing byte arrays. * */
+  @DefaultSchema(JavaBeanSchema.class)
+  public static class BeanWithByteArray {
+    private byte[] bytes1;
+    private ByteBuffer bytes2;
+
+    public BeanWithByteArray(byte[] bytes1, ByteBuffer bytes2) {
+      this.bytes1 = bytes1;
+      this.bytes2 = bytes2;
+    }
+
+    public BeanWithByteArray() {}
+
+    public byte[] getBytes1() {
+      return bytes1;
+    }
+
+    public void setBytes1(byte[] bytes1) {
+      this.bytes1 = bytes1;
+    }
+
+    public ByteBuffer getBytes2() {
+      return bytes2;
+    }
+
+    public void setBytes2(ByteBuffer bytes2) {
+      this.bytes2 = bytes2;
+    }
+  }
+
+  /** The schema for {@link BeanWithByteArray}. * */
+  public static final Schema BEAN_WITH_BYTE_ARRAY_SCHEMA =
+      Schema.builder().addByteArrayField("bytes1").addByteArrayField("bytes2").build();
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
new file mode 100644
index 00000000000..5d60884a2ab
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.DefaultSchema;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Various Java POJOs and associated schemas used in tests. */
+public class TestPOJOs {
+  /** A simple POJO containing basic types. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class SimplePOJO {
+    public String str;
+    public byte aByte;
+    public short aShort;
+    public int anInt;
+    public long aLong;
+    public boolean aBoolean;
+    public DateTime dateTime;
+    public Instant instant;
+    public byte[] bytes;
+    public ByteBuffer byteBuffer;
+    public BigDecimal bigDecimal;
+    public StringBuilder stringBuilder;
+
+    public SimplePOJO() {}
+
+    public SimplePOJO(
+        String str,
+        byte aByte,
+        short aShort,
+        int anInt,
+        long aLong,
+        boolean aBoolean,
+        DateTime dateTime,
+        Instant instant,
+        byte[] bytes,
+        ByteBuffer byteBuffer,
+        BigDecimal bigDecimal,
+        StringBuilder stringBuilder) {
+      this.str = str;
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+      this.dateTime = dateTime;
+      this.instant = instant;
+      this.bytes = bytes;
+      this.byteBuffer = byteBuffer;
+      this.bigDecimal = bigDecimal;
+      this.stringBuilder = stringBuilder;
+    }
+  }
+
+  /** The schema for {@link SimplePOJO}. * */
+  public static final Schema SIMPLE_POJO_SCHEMA =
+      Schema.builder()
+          .addStringField("str")
+          .addByteField("aByte")
+          .addInt16Field("aShort")
+          .addInt32Field("anInt")
+          .addInt64Field("aLong")
+          .addBooleanField("aBoolean")
+          .addDateTimeField("dateTime")
+          .addDateTimeField("instant")
+          .addByteArrayField("bytes")
+          .addByteArrayField("byteBuffer")
+          .addDecimalField("bigDecimal")
+          .addStringField("stringBuilder")
+          .build();
+
+  /** A POJO containing a nested class. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NestedPOJO {
+    public SimplePOJO nested;
+
+    public NestedPOJO(SimplePOJO nested) {
+      this.nested = nested;
+    }
+
+    public NestedPOJO() {}
+  }
+
+  /** The schema for {@link NestedPOJO}. * */
+  public static final Schema NESTED_POJO_SCHEMA =
+      Schema.builder().addRowField("nested", SIMPLE_POJO_SCHEMA).build();
+
+  /** A POJO containing arrays of primitive types. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class PrimitiveArrayPOJO {
+    // Test every type of array parameter supported.
+    public List<String> strings;
+    public int[] integers;
+    public Long[] longs;
+
+    public PrimitiveArrayPOJO() {}
+
+    public PrimitiveArrayPOJO(List<String> strings, int[] integers, Long[] longs) {
+      this.strings = strings;
+      this.integers = integers;
+      this.longs = longs;
+    }
+  }
+
+  /** The schema for {@link PrimitiveArrayPOJO}. * */
+  public static final Schema PRIMITIVE_ARRAY_POJO_SCHEMA =
+      Schema.builder()
+          .addArrayField("strings", FieldType.STRING)
+          .addArrayField("integers", FieldType.INT32)
+          .addArrayField("longs", FieldType.INT64)
+          .build();
+
+  /** A POJO containing arrays of complex classes. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NestedArrayPOJO {
+    public SimplePOJO[] pojos;
+
+    public NestedArrayPOJO(SimplePOJO... pojos) {
+      this.pojos = pojos;
+    }
+
+    public NestedArrayPOJO() {}
+  }
+
+  /** The schema for {@link NestedArrayPOJO}. * */
+  public static final Schema NESTED_ARRAY_POJO_SCHEMA =
+      Schema.builder().addArrayField("pojos", FieldType.row(SIMPLE_POJO_SCHEMA)).build();
+
+  /** A bean containing arrays of arrays. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NestedArraysPOJO {
+    public List<List<String>> lists;
+
+    public NestedArraysPOJO(List<List<String>> lists) {
+      this.lists = lists;
+    }
+
+    public NestedArraysPOJO() {}
+  }
+
+  /** The schema for {@link NestedArraysPOJO}. * */
+  public static final Schema NESTED_ARRAYS_POJO_SCHEMA =
+      Schema.builder().addArrayField("lists", FieldType.array(FieldType.STRING)).build();
+
+  /** A POJO containing a {@link List} of a complex type. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NestedCollectionPOJO {
+    public List<SimplePOJO> simples;
+
+    public NestedCollectionPOJO(List<SimplePOJO> simples) {
+      this.simples = simples;
+    }
+
+    public NestedCollectionPOJO() {}
+  }
+
+  /** The schema for {@link NestedCollectionPOJO}. * */
+  public static final Schema NESTED_COLLECTION_POJO_SCHEMA =
+      Schema.builder().addArrayField("simples", FieldType.row(SIMPLE_POJO_SCHEMA)).build();
+
+  /** A POJO containing a simple {@link Map}. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class PrimitiveMapPOJO {
+    public Map<String, Integer> map;
+
+    public PrimitiveMapPOJO(Map<String, Integer> map) {
+      this.map = map;
+    }
+
+    public PrimitiveMapPOJO() {}
+  }
+
+  /** The schema for {@link PrimitiveMapPOJO}. * */
+  public static final Schema PRIMITIVE_MAP_POJO_SCHEMA =
+      Schema.builder().addMapField("map", FieldType.STRING, FieldType.INT32).build();
+
+  /** A POJO containing a map of a complex type. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class NestedMapPOJO {
+    public Map<String, SimplePOJO> map;
+
+    public NestedMapPOJO(Map<String, SimplePOJO> map) {
+      this.map = map;
+    }
+
+    public NestedMapPOJO() {}
+  }
+
+  /** The schema for {@link NestedMapPOJO}. * */
+  public static final Schema NESTED_MAP_POJO_SCHEMA =
+      Schema.builder()
+          .addMapField("map", FieldType.STRING, FieldType.row(SIMPLE_POJO_SCHEMA))
+          .build();
+
+  /** A POJO containing the boxed version of primitive types. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJOWithBoxedFields {
+    public Byte aByte;
+    public Short aShort;
+    public Integer anInt;
+    public Long aLong;
+    public Boolean aBoolean;
+
+    public POJOWithBoxedFields(
+        Byte aByte, Short aShort, Integer anInt, Long aLong, Boolean aBoolean) {
+      this.aByte = aByte;
+      this.aShort = aShort;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aBoolean = aBoolean;
+    }
+
+    public POJOWithBoxedFields() {}
+  }
+
+  /** The schema for {@link POJOWithBoxedFields}. * */
+  public static final Schema POJO_WITH_BOXED_FIELDS_SCHEMA =
+      Schema.builder()
+          .addByteField("aByte")
+          .addInt16Field("aShort")
+          .addInt32Field("anInt")
+          .addInt64Field("aLong")
+          .addBooleanField("aBoolean")
+          .build();
+
+  /** A POJO containing byte arrays. * */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class POJOWithByteArray {
+    public byte[] bytes1;
+    public ByteBuffer bytes2;
+
+    public POJOWithByteArray(byte[] bytes1, ByteBuffer bytes2) {
+      this.bytes1 = bytes1;
+      this.bytes2 = bytes2;
+    }
+
+    public POJOWithByteArray() {}
+  }
+
+  /** The schema for {@link POJOWithByteArray}. * */
+  public static final Schema POJO_WITH_BYTE_ARRAY_SCHEMA =
+      Schema.builder().addByteArrayField("bytes1").addByteArrayField("bytes2").build();
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
index a39f1f139e6..76c3de0ae33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
@@ -20,7 +20,9 @@
 import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.schemas.DefaultSchema;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -350,4 +352,40 @@ public void testUnmatchedSchema() {
                   public void process(@FieldAccess("a") Row row) {}
                 }));
   }
+
+  /** Test POJO. */
+  @DefaultSchema(JavaFieldSchema.class)
+  public static class InferredPojo {
+    public String stringField;
+    public Integer integerField;
+
+    public InferredPojo(String stringField, Integer integerField) {
+      this.stringField = stringField;
+      this.integerField = integerField;
+    }
+
+    public InferredPojo() {}
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesSchema.class})
+  public void testInferredSchemaPipeline() {
+    List<InferredPojo> pojoList =
+        Lists.newArrayList(
+            new InferredPojo("a", 1), new InferredPojo("b", 2), new InferredPojo("c", 3));
+
+    PCollection<String> output =
+        pipeline
+            .apply(Create.of(pojoList))
+            .apply(
+                ParDo.of(
+                    new DoFn<InferredPojo, String>() {
+                      @ProcessElement
+                      public void process(@Element Row row, OutputReceiver<String> r) {
+                        r.output(row.getString(0) + ":" + row.getInt32(1));
+                      }
+                    }));
+    PAssert.that(output).containsInAnyOrder("a:1", "b:2", "c:3");
+    pipeline.run();
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
index 4210d59d9ba..52455ca4b6d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
@@ -73,8 +73,8 @@ public void testThrowsForUnsupportedTypes() throws Exception {
         Arrays.<FieldValueGetter>asList(getter("unsupportedGetter", UnsupportedClass.class)));
   }
 
-  private static FieldValueGetter<Object> getter(final String fieldName, final Class fieldType) {
-    return new FieldValueGetter<Object>() {
+  private static FieldValueGetter getter(final String fieldName, final Class fieldType) {
+    return new FieldValueGetter() {
       @Override
       public Object get(Object object) {
         return null;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 9321d02f13a..19044459f2a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -50,6 +50,7 @@
           .put(FieldType.DOUBLE, SqlTypeName.DOUBLE)
           .put(FieldType.DECIMAL, SqlTypeName.DECIMAL)
           .put(FieldType.BOOLEAN, SqlTypeName.BOOLEAN)
+          .put(FieldType.BYTES, SqlTypeName.VARBINARY)
           .put(FieldType.DATETIME.withMetadata("DATE"), SqlTypeName.DATE)
           .put(FieldType.DATETIME.withMetadata("TIME"), SqlTypeName.TIME)
           .put(
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
index 8cc54b33331..70a710601d3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
@@ -87,10 +87,11 @@ public void testThrowsForUnsupportedTypes() throws Exception {
         Arrays.<FieldValueGetter>asList(getter("arrayListGetter", ArrayList.class)));
   }
 
-  private static FieldValueGetter<Object> getter(final String fieldName, final Class fieldType) {
-    return new FieldValueGetter<Object>() {
+  private static <T> FieldValueGetter<Object, T> getter(
+      final String fieldName, final Class<T> fieldType) {
+    return new FieldValueGetter<Object, T>() {
       @Override
-      public Object get(Object object) {
+      public T get(Object object) {
         return null;
       }
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
index 3a95e075b3e..e2de6af0be7 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -49,7 +49,7 @@ public void testRefInRange() {
         ref3.evaluate(row, null, BeamSqlExpressionEnvironments.empty()).getValue());
   }
 
-  @Test(expected = IndexOutOfBoundsException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testRefOutOfRange() {
     BeamSqlInputRefExpression ref = new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 5);
     ref.evaluate(row, null, BeamSqlExpressionEnvironments.empty()).getValue();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120951)
    Time Spent: 9.5h  (was: 9h 20m)

> Provide automatic schema registration for POJOs
> -----------------------------------------------
>
>                 Key: BEAM-4453
>                 URL: https://issues.apache.org/jira/browse/BEAM-4453
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message