beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [47/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
deleted file mode 100644
index 2b0190b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-
-/**
- * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms
- * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object.
- *
- * @param <T> type of JAXB annotated objects that will be serialized.
- */
-public class JAXBCoder<T> extends AtomicCoder<T> {
-
-  private final Class<T> jaxbClass;
-  private transient Marshaller jaxbMarshaller = null;
-  private transient Unmarshaller jaxbUnmarshaller = null;
-
-  public Class<T> getJAXBClass() {
-    return jaxbClass;
-  }
-
-  private JAXBCoder(Class<T> jaxbClass) {
-    this.jaxbClass = jaxbClass;
-  }
-
-  /**
-   * Create a coder for a given type of JAXB annotated objects.
-   *
-   * @param jaxbClass the {@code Class} of the JAXB annotated objects.
-   */
-  public static <T> JAXBCoder<T> of(Class<T> jaxbClass) {
-    return new JAXBCoder<>(jaxbClass);
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    try {
-      if (jaxbMarshaller == null) {
-        JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
-        jaxbMarshaller = jaxbContext.createMarshaller();
-      }
-
-      jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) {
-        // JAXB closes the underyling stream so we must filter out those calls.
-        @Override
-        public void close() throws IOException {
-        }
-      });
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
-    try {
-      if (jaxbUnmarshaller == null) {
-        JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
-        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-      }
-
-      @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new FilterInputStream(inStream) {
-        // JAXB closes the underyling stream so we must filter out those calls.
-        @Override
-        public void close() throws IOException {
-        }
-      });
-      return obj;
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  @Override
-  public String getEncodingId() {
-    return getJAXBClass().getName();
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String JAXB_CLASS = "jaxb_class";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T> JAXBCoder<T> of(
-      @JsonProperty(JAXB_CLASS) String jaxbClassName) {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
-      return of(jaxbClass);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
deleted file mode 100644
index 33085cf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoder.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addBoolean;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * A {@code KvCoder} encodes {@link KV}s.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class KvCoder<K, V> extends KvCoderBase<KV<K, V>> {
-  public static <K, V> KvCoder<K, V> of(Coder<K> keyCoder,
-                                        Coder<V> valueCoder) {
-    return new KvCoder<>(keyCoder, valueCoder);
-  }
-
-  @JsonCreator
-  public static KvCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 2,
-        "Expecting 2 components, got " + components.size());
-    return of(components.get(0), components.get(1));
-  }
-
-  public static <K, V> List<Object> getInstanceComponents(
-      KV<K, V> exampleValue) {
-    return Arrays.asList(
-        exampleValue.getKey(),
-        exampleValue.getValue());
-  }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<V> getValueCoder() {
-    return valueCoder;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Coder<K> keyCoder;
-  private final Coder<V> valueCoder;
-
-  private KvCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
-    this.keyCoder = keyCoder;
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(KV<K, V> kv, OutputStream outStream, Context context)
-      throws IOException, CoderException  {
-    if (kv == null) {
-      throw new CoderException("cannot encode a null KV");
-    }
-    Context nestedContext = context.nested();
-    keyCoder.encode(kv.getKey(), outStream, nestedContext);
-    valueCoder.encode(kv.getValue(), outStream, nestedContext);
-  }
-
-  @Override
-  public KV<K, V> decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    Context nestedContext = context.nested();
-    K key = keyCoder.decode(inStream, nestedContext);
-    V value = valueCoder.decode(inStream, nestedContext);
-    return KV.of(key, value);
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder, valueCoder);
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Key coder must be deterministic", getKeyCoder());
-    verifyDeterministic("Value coder must be deterministic", getValueCoder());
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return keyCoder.consistentWithEquals() && valueCoder.consistentWithEquals();
-  }
-
-  @Override
-  public Object structuralValue(KV<K, V> kv) throws Exception {
-    if (consistentWithEquals()) {
-      return kv;
-    } else {
-      return KV.of(getKeyCoder().structuralValue(kv.getKey()),
-                   getValueCoder().structuralValue(kv.getValue()));
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
-    return result;
-  }
-
-  /**
-   * Returns whether both keyCoder and valueCoder are considered not expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(KV<K, V> kv, Context context) {
-    return keyCoder.isRegisterByteSizeObserverCheap(kv.getKey(),
-                                                    context.nested())
-        && valueCoder.isRegisterByteSizeObserverCheap(kv.getValue(),
-                                                      context.nested());
-  }
-
-  /**
-   * Notifies ElementByteSizeObserver about the byte size of the
-   * encoded value using this coder.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      KV<K, V> kv, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    if (kv == null) {
-      throw new CoderException("cannot encode a null KV");
-    }
-    keyCoder.registerByteSizeObserver(
-        kv.getKey(), observer, context.nested());
-    valueCoder.registerByteSizeObserver(
-        kv.getValue(), observer, context.nested());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
deleted file mode 100644
index 4a12ee0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/KvCoderBase.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for KvCoder. Works around a Jackson2 bug tickled when building
- * {@link KvCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters).  This class should be removed when possible.
- *
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class KvCoderBase<T> extends StandardCoder<T> {
-  /**
-   * A constructor used only for decoding from JSON.
-   *
-   * @param typeId present in the JSON encoding, but unused
-   * @param isPairLike present in the JSON encoding, but unused
-   */
-  @Deprecated
-  @JsonCreator
-  public static KvCoderBase<?> of(
-      // N.B. typeId is a required parameter here, since a field named "@type"
-      // is presented to the deserializer as an input.
-      //
-      // If this method did not consume the field, Jackson2 would observe an
-      // unconsumed field and a returned value of a derived type.  So Jackson2
-      // would attempt to update the returned value with the unconsumed field
-      // data.  The standard JsonDeserializer does not implement a mechanism for
-      // updating constructed values, so it would throw an exception, causing
-      // deserialization to fail.
-      @JsonProperty(value = "@type", required = false) String typeId,
-      @JsonProperty(value = PropertyNames.IS_PAIR_LIKE, required = false) boolean isPairLike,
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    return KvCoder.of(components);
-  }
-
-  protected KvCoderBase() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
deleted file mode 100644
index bc74404..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/ListCoder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A {@link Coder} for {@link List}, using the format of {@link IterableLikeCoder}.
- *
- * @param <T> the type of the elements of the Lists being transcoded
- */
-public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
-
-  public static <T> ListCoder<T> of(Coder<T> elemCoder) {
-    return new ListCoder<>(elemCoder);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  @Override
-  protected final List<T> decodeToIterable(List<T> decodedElements) {
-    return decodedElements;
-  }
-
-  @JsonCreator
-  public static ListCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
-  /**
-   * Returns the first element in this list if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(List<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  protected ListCoder(Coder<T> elemCoder) {
-    super(elemCoder, "List");
-  }
-
-  /**
-   * List sizes are always known, so ListIterable may be deterministic while
-   * the general IterableLikeCoder is not.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "ListCoder.elemCoder must be deterministic", getElemCoder());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
deleted file mode 100644
index b6f3103..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoder.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * A {@link Coder} for {@link Map Maps} that encodes them according to provided
- * coders for keys and values.
- *
- * @param <K> the type of the keys of the KVs being transcoded
- * @param <V> the type of the values of the KVs being transcoded
- */
-public class MapCoder<K, V> extends MapCoderBase<Map<K, V>> {
-  /**
-   * Produces a MapCoder with the given keyCoder and valueCoder.
-   */
-  public static <K, V> MapCoder<K, V> of(
-      Coder<K> keyCoder,
-      Coder<V> valueCoder) {
-    return new MapCoder<>(keyCoder, valueCoder);
-  }
-
-  @JsonCreator
-  public static MapCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 2,
-        "Expecting 2 components, got " + components.size());
-    return of((Coder<?>) components.get(0), (Coder<?>) components.get(1));
-  }
-
-  /**
-   * Returns the key and value for an arbitrary element of this map,
-   * if it is non-empty, otherwise returns {@code null}.
-   */
-   public static <K, V> List<Object> getInstanceComponents(
-       Map<K, V> exampleValue) {
-     for (Map.Entry<K, V> entry : exampleValue.entrySet()) {
-       return Arrays.asList(entry.getKey(), entry.getValue());
-     }
-     return null;
-   }
-
-  public Coder<K> getKeyCoder() {
-    return keyCoder;
-  }
-
-  public Coder<V> getValueCoder() {
-    return valueCoder;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  Coder<K> keyCoder;
-  Coder<V> valueCoder;
-
-  MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
-    this.keyCoder = keyCoder;
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(
-      Map<K, V> map,
-      OutputStream outStream,
-      Context context)
-      throws IOException, CoderException  {
-    if (map == null) {
-      throw new CoderException("cannot encode a null Map");
-    }
-    DataOutputStream dataOutStream = new DataOutputStream(outStream);
-    dataOutStream.writeInt(map.size());
-    for (Entry<K, V> entry : map.entrySet()) {
-      keyCoder.encode(entry.getKey(), outStream, context.nested());
-      valueCoder.encode(entry.getValue(), outStream, context.nested());
-    }
-    dataOutStream.flush();
-  }
-
-  @Override
-  public Map<K, V> decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    DataInputStream dataInStream = new DataInputStream(inStream);
-    int size = dataInStream.readInt();
-    Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
-    for (int i = 0; i < size; ++i) {
-      K key = keyCoder.decode(inStream, context.nested());
-      V value = valueCoder.decode(inStream, context.nested());
-      retval.put(key, value);
-    }
-    return retval;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return a {@link List} containing the key coder at index 0 at the and value coder at index 1.
-   */
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return Arrays.asList(keyCoder, valueCoder);
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Not all maps have a deterministic encoding.
-   * For example, {@code HashMap} comparison does not depend on element order, so
-   * two {@code HashMap} instances may be equal but produce different encodings.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Ordering of entries in a Map may be non-deterministic.");
-  }
-
-  @Override
-  public void registerByteSizeObserver(
-      Map<K, V> map, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(4L);
-    for (Entry<K, V> entry : map.entrySet()) {
-      keyCoder.registerByteSizeObserver(
-          entry.getKey(), observer, context.nested());
-      valueCoder.registerByteSizeObserver(
-          entry.getValue(), observer, context.nested());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
deleted file mode 100644
index d32406c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/MapCoderBase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/**
- * A abstract base class for MapCoder. Works around a Jackson2 bug tickled when building
- * {@link MapCoder} directly (as of this writing, Jackson2 walks off the end of
- * an array when it tries to deserialize a class with multiple generic type
- * parameters).  This should be removed in favor of a better workaround.
- * @param <T> the type of values being transcoded
- */
-@Deprecated
-public abstract class MapCoderBase<T> extends StandardCoder<T> {
-  @Deprecated
-  @JsonCreator
-  public static MapCoderBase<?> of(
-      // N.B. typeId is a required parameter here, since a field named "@type"
-      // is presented to the deserializer as an input.
-      //
-      // If this method did not consume the field, Jackson2 would observe an
-      // unconsumed field and a returned value of a derived type.  So Jackson2
-      // would attempt to update the returned value with the unconsumed field
-      // data, The standard JsonDeserializer does not implement a mechanism for
-      // updating constructed values, so it would throw an exception, causing
-      // deserialization to fail.
-      @JsonProperty(value = "@type", required = false) String typeId,
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    return MapCoder.of(components);
-  }
-
-  protected MapCoderBase() {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
deleted file mode 100644
index 5598a71..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/NullableCoder.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link NullableCoder} encodes nullable values of type {@code T} using a nested
- * {@code Coder<T>} that does not tolerate {@code null} values. {@link NullableCoder} uses
- * exactly 1 byte per entry to indicate whether the value is {@code null}, then adds the encoding
- * of the inner coder for non-null values.
- *
- * @param <T> the type of the values being transcoded
- */
-public class NullableCoder<T> extends StandardCoder<T> {
-  public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
-    return new NullableCoder<>(valueCoder);
-  }
-
-  @JsonCreator
-  public static NullableCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 components, got " + components.size());
-    return of(components.get(0));
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private final Coder<T> valueCoder;
-  private static final int ENCODE_NULL = 0;
-  private static final int ENCODE_PRESENT = 1;
-
-  private NullableCoder(Coder<T> valueCoder) {
-    this.valueCoder = valueCoder;
-  }
-
-  @Override
-  public void encode(@Nullable T value, OutputStream outStream, Context context)
-      throws IOException, CoderException  {
-    if (value == null) {
-      outStream.write(ENCODE_NULL);
-    } else {
-      outStream.write(ENCODE_PRESENT);
-      valueCoder.encode(value, outStream, context.nested());
-    }
-  }
-
-  @Override
-  @Nullable
-  public T decode(InputStream inStream, Context context) throws IOException, CoderException {
-    int b = inStream.read();
-    if (b == ENCODE_NULL) {
-      return null;
-    } else if (b != ENCODE_PRESENT) {
-        throw new CoderException(String.format(
-            "NullableCoder expects either a byte valued %s (null) or %s (present), got %s",
-            ENCODE_NULL, ENCODE_PRESENT, b));
-    }
-    return valueCoder.decode(inStream, context.nested());
-  }
-
-  @Override
-  public List<Coder<T>> getCoderArguments() {
-    return ImmutableList.of(valueCoder);
-  }
-
-  /**
-   * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Value coder must be deterministic", valueCoder);
-  }
-
-  /**
-   * {@code NullableCoder} is consistent with equals if the nested {@code Coder} is.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return valueCoder.consistentWithEquals();
-  }
-
-  @Override
-  public Object structuralValue(@Nullable T value) throws Exception {
-    if (value == null) {
-      return Optional.absent();
-    }
-    return Optional.of(valueCoder.structuralValue(value));
-  }
-
-  /**
-   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
-   * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
-   * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public void registerByteSizeObserver(
-      @Nullable T value, ElementByteSizeObserver observer, Context context) throws Exception {
-    observer.update(1);
-    if (value != null) {
-      valueCoder.registerByteSizeObserver(value, observer, context.nested());
-    }
-  }
-
-  /**
-   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
-   * counting the bytes. The size is known (1 byte) when {@code value} is {@code null}, otherwise
-   * the size is 1 byte plus the size of nested {@code Coder}'s encoding of {@code value}.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  protected long getEncodedElementByteSize(@Nullable T value, Context context) throws Exception {
-    if (value == null) {
-      return 1;
-    }
-
-    if (valueCoder instanceof StandardCoder) {
-      // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
-      // the value, adding 1 byte to count the null indicator.
-      return 1  + ((StandardCoder<T>) valueCoder)
-          .getEncodedElementByteSize(value, context.nested());
-    }
-
-    // If value is not a StandardCoder then fall back to the default StandardCoder behavior
-    // of encoding and counting the bytes. The encoding will include the null indicator byte.
-    return super.getEncodedElementByteSize(value, context);
-  }
-
-  /**
-   * {@code NullableCoder} is cheap if {@code valueCoder} is cheap.
-   *
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
-    return valueCoder.isRegisterByteSizeObserverCheap(value, context.nested());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
deleted file mode 100644
index ef91ba9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/Proto2Coder.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.Structs;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ExtensionRegistry;
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Coder} using Google Protocol Buffers 2 binary format.
- *
- * <p>To learn more about Protocol Buffers, visit:
- * <a href="https://developers.google.com/protocol-buffers">https://developers.google.com/protocol-buffers</a>
- *
- * <p>To use, specify the {@link Coder} type on a PCollection containing Protocol Buffers messages.
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- *     input.apply(...)
- *          .setCoder(Proto2Coder.of(MyProto.Message.class));
- * }
- * </pre>
- *
- * <p>Custom message extensions are also supported, but the coder must be made
- * aware of them explicitly:
- *
- * <pre>
- * {@code
- * PCollection<MyProto.Message> records =
- *     input.apply(...)
- *          .setCoder(Proto2Coder.of(MyProto.Message.class)
- *              .addExtensionsFrom(MyProto.class));
- * }
- * </pre>
- *
- * @param <T> the type of elements handled by this coder, must extend {@code Message}
- * @deprecated Use {@link ProtoCoder}.
- */
-@Deprecated
-public class Proto2Coder<T extends Message> extends AtomicCoder<T> {
-
-  /** The class of Protobuf message to be encoded. */
-  private final Class<T> protoMessageClass;
-
-  /**
-   * All extension host classes included in this Proto2Coder. The extensions from
-   * these classes will be included in the {@link ExtensionRegistry} used during
-   * encoding and decoding.
-   */
-  private final List<Class<?>> extensionHostClasses;
-
-  private Proto2Coder(Class<T> protoMessageClass, List<Class<?>> extensionHostClasses) {
-    this.protoMessageClass = protoMessageClass;
-    this.extensionHostClasses = extensionHostClasses;
-  }
-
-  private static final CoderProvider PROVIDER =
-      new CoderProvider() {
-        @Override
-        public <T> Coder<T> getCoder(TypeDescriptor<T> type) throws CannotProvideCoderException {
-          if (type.isSubtypeOf(new TypeDescriptor<Message>() {})) {
-            @SuppressWarnings("unchecked")
-            TypeDescriptor<? extends Message> messageType =
-                (TypeDescriptor<? extends Message>) type;
-            @SuppressWarnings("unchecked")
-            Coder<T> coder = (Coder<T>) Proto2Coder.of(messageType);
-            return coder;
-          } else {
-            throw new CannotProvideCoderException(
-                String.format(
-                    "Cannot provide Proto2Coder because %s "
-                        + "is not a subclass of protocol buffer Messsage",
-                    type));
-          }
-        }
-      };
-
-  public static CoderProvider coderProvider() {
-    return PROVIDER;
-  }
-
-  /**
-   * Returns a {@code Proto2Coder} for the given Protobuf message class.
-   */
-  public static <T extends Message> Proto2Coder<T> of(Class<T> protoMessageClass) {
-    return new Proto2Coder<T>(protoMessageClass, Collections.<Class<?>>emptyList());
-  }
-
-  /**
-   * Returns a {@code Proto2Coder} for the given Protobuf message class.
-   */
-  public static <T extends Message> Proto2Coder<T> of(TypeDescriptor<T> protoMessageType) {
-    @SuppressWarnings("unchecked")
-    Class<T> protoMessageClass = (Class<T>) protoMessageType.getRawType();
-    return of(protoMessageClass);
-  }
-
-  /**
-   * Produces a {@code Proto2Coder} like this one, but with the extensions from
-   * the given classes registered.
-   *
-   * @param moreExtensionHosts an iterable of classes that define a static
-   *      method {@code registerAllExtensions(ExtensionRegistry)}
-   */
-  public Proto2Coder<T> withExtensionsFrom(Iterable<Class<?>> moreExtensionHosts) {
-    for (Class<?> extensionHost : moreExtensionHosts) {
-      // Attempt to access the required method, to make sure it's present.
-      try {
-        Method registerAllExtensions =
-            extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-        checkArgument(
-            Modifier.isStatic(registerAllExtensions.getModifiers()),
-            "Method registerAllExtensions() must be static for use with Proto2Coder");
-      } catch (NoSuchMethodException | SecurityException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-
-    return new Proto2Coder<T>(
-        protoMessageClass,
-        new ImmutableList.Builder<Class<?>>()
-            .addAll(extensionHostClasses)
-            .addAll(moreExtensionHosts)
-            .build());
-  }
-
-  /**
-   * See {@link #withExtensionsFrom(Iterable)}.
-   */
-  public Proto2Coder<T> withExtensionsFrom(Class<?>... extensionHosts) {
-    return withExtensionsFrom(ImmutableList.copyOf(extensionHosts));
-  }
-
-  /**
-   * Adds custom Protobuf extensions to the coder. Returns {@code this}
-   * for method chaining.
-   *
-   * @param extensionHosts must be a class that defines a static
-   *      method name {@code registerAllExtensions}
-   * @deprecated use {@link #withExtensionsFrom}
-   */
-  @Deprecated
-  public Proto2Coder<T> addExtensionsFrom(Class<?>... extensionHosts) {
-    return addExtensionsFrom(ImmutableList.copyOf(extensionHosts));
-  }
-
-  /**
-   * Adds custom Protobuf extensions to the coder. Returns {@code this}
-   * for method chaining.
-   *
-   * @param extensionHosts must be a class that defines a static
-   *      method name {@code registerAllExtensions}
-   * @deprecated use {@link #withExtensionsFrom}
-   */
-  @Deprecated
-  public Proto2Coder<T> addExtensionsFrom(Iterable<Class<?>> extensionHosts) {
-    for (Class<?> extensionHost : extensionHosts) {
-      try {
-        // Attempt to access the declared method, to make sure it's present.
-        extensionHost.getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class);
-      } catch (NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-      extensionHostClasses.add(extensionHost);
-    }
-    // The memoized extension registry needs to be recomputed because we have mutated this object.
-    synchronized (this) {
-      memoizedExtensionRegistry = null;
-      getExtensionRegistry();
-    }
-    return this;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
-    }
-    if (context.isWholeStream) {
-      value.writeTo(outStream);
-    } else {
-      value.writeDelimitedTo(outStream);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
-    if (context.isWholeStream) {
-      return getParser().parseFrom(inStream, getExtensionRegistry());
-    } else {
-      return getParser().parseDelimitedFrom(inStream, getExtensionRegistry());
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (!(other instanceof Proto2Coder)) {
-      return false;
-    }
-    Proto2Coder<?> otherCoder = (Proto2Coder<?>) other;
-    return protoMessageClass.equals(otherCoder.protoMessageClass)
-        && Sets.newHashSet(extensionHostClasses)
-            .equals(Sets.newHashSet(otherCoder.extensionHostClasses));
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(protoMessageClass, extensionHostClasses);
-  }
-
-  /**
-   * The encoding identifier is designed to support evolution as per the design of Protocol
-   * Buffers. In order to use this class effectively, carefully follow the advice in the Protocol
-   * Buffers documentation at
-   * <a href="https://developers.google.com/protocol-buffers/docs/proto#updating">Updating
-   * A Message Type</a>.
-   *
-   * <p>In particular, the encoding identifier is guaranteed to be the same for {@code Proto2Coder}
-   * instances of the same principal message class, and otherwise distinct. Loaded extensions do not
-   * affect the id, nor does it encode the full schema.
-   *
-   * <p>When modifying a message class, here are the broadest guidelines; see the above link
-   * for greater detail.
-   *
-   * <ul>
-   * <li>Do not change the numeric tags for any fields.
-   * <li>Never remove a <code>required</code> field.
-   * <li>Only add <code>optional</code> or <code>repeated</code> fields, with sensible defaults.
-   * <li>When changing the type of a field, consult the Protocol Buffers documentation to ensure
-   * the new and old types are interchangeable.
-   * </ul>
-   *
-   * <p>Code consuming this message class should be prepared to support <i>all</i> versions of
-   * the class until it is certain that no remaining serialized instances exist.
-   *
-   * <p>If backwards incompatible changes must be made, the best recourse is to change the name
-   * of your Protocol Buffers message class.
-   */
-  @Override
-  public String getEncodingId() {
-    return protoMessageClass.getName();
-  }
-
-  private transient Parser<T> memoizedParser;
-
-  private Parser<T> getParser() {
-    if (memoizedParser == null) {
-      try {
-        @SuppressWarnings("unchecked")
-        T protoMessageInstance = (T) protoMessageClass.getMethod("getDefaultInstance").invoke(null);
-        @SuppressWarnings("unchecked")
-        Parser<T> tParser = (Parser<T>) protoMessageInstance.getParserForType();
-        memoizedParser = tParser;
-      } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-        throw new IllegalArgumentException(e);
-      }
-    }
-    return memoizedParser;
-  }
-
-  private transient ExtensionRegistry memoizedExtensionRegistry;
-
-  private synchronized ExtensionRegistry getExtensionRegistry() {
-    if (memoizedExtensionRegistry == null) {
-      ExtensionRegistry registry = ExtensionRegistry.newInstance();
-      for (Class<?> extensionHost : extensionHostClasses) {
-        try {
-          extensionHost
-              .getDeclaredMethod("registerAllExtensions", ExtensionRegistry.class)
-              .invoke(null, registry);
-        } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
-          throw new IllegalStateException(e);
-        }
-      }
-      memoizedExtensionRegistry = registry.getUnmodifiable();
-    }
-    return memoizedExtensionRegistry;
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String PROTO_MESSAGE_CLASS = "proto_message_class";
-  private static final String PROTO_EXTENSION_HOSTS = "proto_extension_hosts";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T extends Message> Proto2Coder<T> of(
-      @JsonProperty(PROTO_MESSAGE_CLASS) String protoMessageClassName,
-      @Nullable @JsonProperty(PROTO_EXTENSION_HOSTS) List<String> extensionHostClassNames) {
-
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> protoMessageClass = (Class<T>) Class.forName(protoMessageClassName);
-      List<Class<?>> extensionHostClasses = Lists.newArrayList();
-      if (extensionHostClassNames != null) {
-        for (String extensionHostClassName : extensionHostClassNames) {
-          extensionHostClasses.add(Class.forName(extensionHostClassName));
-        }
-      }
-      return of(protoMessageClass).withExtensionsFrom(extensionHostClasses);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (Class<?> clazz : extensionHostClasses) {
-      extensionHostClassNames.add(CloudObject.forString(clazz.getName()));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
deleted file mode 100644
index 593c9f0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SerializableCoder.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * A {@link Coder} for Java classes that implement {@link Serializable}.
- *
- * <p>To use, specify the coder type on a PCollection:
- * <pre>
- * {@code
- *   PCollection<MyRecord> records =
- *       foo.apply(...).setCoder(SerializableCoder.of(MyRecord.class));
- * }
- * </pre>
- *
- * <p>{@link SerializableCoder} does not guarantee a deterministic encoding, as Java
- * serialization may produce different binary encodings for two equivalent
- * objects.
- *
- * @param <T> the type of elements handled by this coder
- */
-public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
-
-  /**
-   * Returns a {@link SerializableCoder} instance for the provided element type.
-   * @param <T> the element type
-   */
-  public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
-    @SuppressWarnings("unchecked")
-    Class<T> clazz = (Class<T>) type.getRawType();
-    return of(clazz);
-  }
-
-  /**
-   * Returns a {@link SerializableCoder} instance for the provided element class.
-   * @param <T> the element type
-   */
-  public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
-    return new SerializableCoder<>(clazz);
-  }
-
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static SerializableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Serializable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Serializable");
-    }
-    return of((Class<? extends Serializable>) clazz);
-  }
-
-  /**
-   * A {@link CoderProvider} that constructs a {@link SerializableCoder}
-   * for any class that implements serializable.
-   */
-  public static final CoderProvider PROVIDER = new CoderProvider() {
-    @Override
-    public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor)
-        throws CannotProvideCoderException {
-      Class<?> clazz = typeDescriptor.getRawType();
-      if (Serializable.class.isAssignableFrom(clazz)) {
-        @SuppressWarnings("unchecked")
-        Class<? extends Serializable> serializableClazz =
-            (Class<? extends Serializable>) clazz;
-        @SuppressWarnings("unchecked")
-        Coder<T> coder = (Coder<T>) SerializableCoder.of(serializableClazz);
-        return coder;
-      } else {
-        throw new CannotProvideCoderException(
-            "Cannot provide SerializableCoder because " + typeDescriptor
-            + " does not implement Serializable");
-      }
-    }
-  };
-
-
-  private final Class<T> type;
-
-  protected SerializableCoder(Class<T> type) {
-    this.type = type;
-  }
-
-  public Class<T> getRecordType() {
-    return type;
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(outStream);
-      oos.writeObject(value);
-      oos.flush();
-    } catch (IOException exn) {
-      throw new CoderException("unable to serialize record " + value, exn);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      ObjectInputStream ois = new ObjectInputStream(inStream);
-      return type.cast(ois.readObject());
-    } catch (ClassNotFoundException e) {
-      throw new CoderException("unable to deserialize record", e);
-    }
-  }
-
-  @Override
-  public String getEncodingId() {
-    return String.format("%s:%s",
-        type.getName(),
-        ObjectStreamClass.lookup(type).getSerialVersionUID());
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = super.asCloudObject();
-    result.put("type", type.getName());
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Java serialization is not
-   *         deterministic with respect to {@link Object#equals} for all types.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Java Serialization may be non-deterministic.");
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (getClass() != other.getClass()) {
-      return false;
-    }
-    return type == ((SerializableCoder<?>) other).type;
-  }
-
-  @Override
-  public int hashCode() {
-    return type.hashCode();
-  }
-
-  // This coder inherits isRegisterByteSizeObserverCheap,
-  // getEncodedElementByteSize and registerByteSizeObserver
-  // from StandardCoder. Looks like we cannot do much better
-  // in this case.
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
deleted file mode 100644
index 36b3606..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/SetCoder.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.common.base.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The
- * elements may not be in a deterministic order, depending on the {@code Set} implementation.
- *
- * @param <T> the type of the elements of the set
- */
-public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
-
-  /**
-   * Produces a {@link SetCoder} with the given {@code elementCoder}.
-   */
-  public static <T> SetCoder<T> of(Coder<T> elementCoder) {
-    return new SetCoder<>(elementCoder);
-  }
-
-  /**
-   * Dynamically typed constructor for JSON deserialization.
-   */
-  @JsonCreator
-  public static SetCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Object> components) {
-    Preconditions.checkArgument(components.size() == 1,
-        "Expecting 1 component, got " + components.size());
-    return of((Coder<?>) components.get(0));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. Sets are not ordered, but
-   *         they are encoded in the order of an arbitrary iteration.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "Ordering of elements in a set may be non-deterministic.");
-  }
-
-  /**
-   * Returns the first element in this set if it is non-empty,
-   * otherwise returns {@code null}.
-   */
-  public static <T> List<Object> getInstanceComponents(
-      Set<T> exampleValue) {
-    return getInstanceComponentsHelper(exampleValue);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal operations below here.
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return A new {@link Set} built from the elements in the {@link List} decoded by
-   * {@link IterableLikeCoder}.
-   */
-  @Override
-  protected final Set<T> decodeToIterable(List<T> decodedElements) {
-    return new HashSet<>(decodedElements);
-  }
-
-  protected SetCoder(Coder<T> elemCoder) {
-    super(elemCoder, "Set");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
deleted file mode 100644
index faa9861..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StandardCoder.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.cloud.dataflow.sdk.util.Structs.addList;
-import static com.google.cloud.dataflow.sdk.util.Structs.addString;
-import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.util.CloudObject;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
- * via the class name and recursively using {@link #getComponents}.
- *
- * <p>To extend {@link StandardCoder}, override the following methods as appropriate:
- *
- * <ul>
- *   <li>{@link #getComponents}: the default implementation returns {@link #getCoderArguments}.</li>
- *   <li>{@link #getEncodedElementByteSize} and
- *       {@link #isRegisterByteSizeObserverCheap}: the
- *       default implementation encodes values to bytes and counts the bytes, which is considered
- *       expensive.</li>
- *   <li>{@link #getEncodingId} and {@link #getAllowedEncodings}: by default, the encoding id
- *       is the empty string, so only the canonical name of the subclass will be used for
- *       compatibility checks, and no other encoding ids are allowed.</li>
- * </ul>
- */
-public abstract class StandardCoder<T> implements Coder<T> {
-  protected StandardCoder() {}
-
-  @Override
-  public String getEncodingId() {
-    return "";
-  }
-
-  @Override
-  public Collection<String> getAllowedEncodings() {
-    return Collections.emptyList();
-  }
-
-  /**
-   * Returns the list of {@link Coder Coders} that are components of this {@link Coder}.
-   */
-  public List<? extends Coder<?>> getComponents() {
-    List<? extends Coder<?>> coderArguments = getCoderArguments();
-    if (coderArguments == null) {
-      return Collections.emptyList();
-    } else {
-      return coderArguments;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true} if the two {@link StandardCoder} instances have the
-   * same class and equal components.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || this.getClass() != o.getClass()) {
-      return false;
-    }
-    StandardCoder<?> that = (StandardCoder<?>) o;
-    return this.getComponents().equals(that.getComponents());
-  }
-
-  @Override
-  public int hashCode() {
-    return getClass().hashCode() * 31 + getComponents().hashCode();
-  }
-
-  @Override
-  public String toString() {
-    String s = getClass().getName();
-    s = s.substring(s.lastIndexOf('.') + 1);
-    List<? extends Coder<?>> componentCoders = getComponents();
-    if (!componentCoders.isEmpty()) {
-      s += "(";
-      boolean first = true;
-      for (Coder<?> componentCoder : componentCoders) {
-        if (first) {
-          first = false;
-        } else {
-          s += ", ";
-        }
-        s += componentCoder.toString();
-      }
-      s += ")";
-    }
-    return s;
-  }
-
-  @Override
-  public CloudObject asCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-
-    List<? extends Coder<?>> components = getComponents();
-    if (!components.isEmpty()) {
-      List<CloudObject> cloudComponents = new ArrayList<>(components.size());
-      for (Coder<?> coder : components) {
-        cloudComponents.add(coder.asCloudObject());
-      }
-      addList(result, PropertyNames.COMPONENT_ENCODINGS, cloudComponents);
-    }
-
-    String encodingId = getEncodingId();
-    checkNotNull(encodingId, "Coder.getEncodingId() must not return null.");
-    if (!encodingId.isEmpty()) {
-      addString(result, PropertyNames.ENCODING_ID, encodingId);
-    }
-
-    Collection<String> allowedEncodings = getAllowedEncodings();
-    if (!allowedEncodings.isEmpty()) {
-      addStringList(result, PropertyNames.ALLOWED_ENCODINGS, Lists.newArrayList(allowedEncodings));
-    }
-
-    return result;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} unless it is overridden. {@link StandardCoder#registerByteSizeObserver}
-   *         invokes {@link #getEncodedElementByteSize} which requires re-encoding an element
-   *         unless it is overridden. This is considered expensive.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(T value, Context context) {
-    return false;
-  }
-
-  /**
-   * Returns the size in bytes of the encoded value using this coder.
-   */
-  protected long getEncodedElementByteSize(T value, Context context)
-      throws Exception {
-    try {
-      CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
-      encode(value, os, context);
-      return os.getCount();
-    } catch (Exception exn) {
-      throw new IllegalArgumentException(
-          "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>For {@link StandardCoder} subclasses, this notifies {@code observer} about the byte size
-   * of the encoded value using this coder as returned by {@link #getEncodedElementByteSize}.
-   */
-  @Override
-  public void registerByteSizeObserver(
-      T value, ElementByteSizeObserver observer, Context context)
-      throws Exception {
-    observer.update(getEncodedElementByteSize(value, context));
-  }
-
-  protected void verifyDeterministic(String message, Iterable<Coder<?>> coders)
-      throws NonDeterministicException {
-    for (Coder<?> coder : coders) {
-      try {
-        coder.verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new NonDeterministicException(this, message, e);
-      }
-    }
-  }
-
-  protected void verifyDeterministic(String message, Coder<?>... coders)
-      throws NonDeterministicException {
-    verifyDeterministic(message, Arrays.asList(coders));
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code false} for {@link StandardCoder} unless overridden.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return false;
-  }
-
-  @Override
-  public Object structuralValue(T value) throws Exception {
-    if (value != null && consistentWithEquals()) {
-      return value;
-    } else {
-      try {
-        ByteArrayOutputStream os = new ByteArrayOutputStream();
-        encode(value, os, Context.OUTER);
-        return new StructuralByteArray(os.toByteArray());
-      } catch (Exception exn) {
-        throw new IllegalArgumentException(
-            "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
deleted file mode 100644
index 1fc1247..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringDelegateCoder.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
-
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * A {@link Coder} that wraps a {@code Coder<String>}
- * and encodes/decodes values via string representations.
- *
- * <p>To decode, the input byte stream is decoded to
- * a {@link String}, and this is passed to the single-argument
- * constructor for {@code T}.
- *
- * <p>To encode, the input value is converted via {@code toString()},
- * and this string is encoded.
- *
- * <p>In order for this to operate correctly for a class {@code Clazz},
- * it must be the case for any instance {@code x} that
- * {@code x.equals(new Clazz(x.toString()))}.
- *
- * <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
- * it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder},
- * or {@link JAXBCoder}.
- *
- * @param <T> The type of objects coded.
- */
-public class StringDelegateCoder<T> extends DelegateCoder<T, String> {
-  public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
-    return new StringDelegateCoder<T>(clazz);
-  }
-
-  @Override
-  public String toString() {
-    return "StringDelegateCoder(" + clazz + ")";
-  }
-
-  private final Class<T> clazz;
-
-  protected StringDelegateCoder(final Class<T> clazz) {
-    super(StringUtf8Coder.of(),
-      new CodingFunction<T, String>() {
-        @Override
-        public String apply(T input) {
-          return input.toString();
-        }
-      },
-      new CodingFunction<String, T>() {
-        @Override
-        public T apply(String input) throws
-            NoSuchMethodException,
-            InstantiationException,
-            IllegalAccessException,
-            InvocationTargetException {
-          return clazz.getConstructor(String.class).newInstance(input);
-        }
-      });
-
-    this.clazz = clazz;
-  }
-
-  /**
-   * The encoding id is the fully qualified name of the encoded/decoded class.
-   */
-  @Override
-  public String getEncodingId() {
-    return clazz.getName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
deleted file mode 100644
index 179840c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StringUtf8Coder.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.ExposedByteArrayOutputStream;
-import com.google.cloud.dataflow.sdk.util.StreamUtils;
-import com.google.cloud.dataflow.sdk.util.VarInt;
-import com.google.common.base.Utf8;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.CountingOutputStream;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding.
- * If in a nested context, prefixes the string with an integer length field,
- * encoded via a {@link VarIntCoder}.
- */
-public class StringUtf8Coder extends AtomicCoder<String> {
-
-  @JsonCreator
-  public static StringUtf8Coder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
-
-  private static void writeString(String value, DataOutputStream dos)
-      throws IOException {
-    byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-    VarInt.encode(bytes.length, dos);
-    dos.write(bytes);
-  }
-
-  private static String readString(DataInputStream dis) throws IOException {
-    int len = VarInt.decodeInt(dis);
-    if (len < 0) {
-      throw new CoderException("Invalid encoded string length: " + len);
-    }
-    byte[] bytes = new byte[len];
-    dis.readFully(bytes);
-    return new String(bytes, StandardCharsets.UTF_8);
-  }
-
-  private StringUtf8Coder() {}
-
-  @Override
-  public void encode(String value, OutputStream outStream, Context context)
-      throws IOException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null String");
-    }
-    if (context.isWholeStream) {
-      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-      if (outStream instanceof ExposedByteArrayOutputStream) {
-        ((ExposedByteArrayOutputStream) outStream).writeAndOwn(bytes);
-      } else {
-        outStream.write(bytes);
-      }
-    } else {
-      writeString(value, new DataOutputStream(outStream));
-    }
-  }
-
-  @Override
-  public String decode(InputStream inStream, Context context)
-      throws IOException {
-    if (context.isWholeStream) {
-      byte[] bytes = StreamUtils.getBytes(inStream);
-      return new String(bytes, StandardCharsets.UTF_8);
-    } else {
-      try {
-        return readString(new DataInputStream(inStream));
-      } catch (EOFException | UTFDataFormatException exn) {
-        // These exceptions correspond to decoding problems, so change
-        // what kind of exception they're branded as.
-        throw new CoderException(exn);
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. This coder is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return the byte size of the UTF-8 encoding of the a string or, in a nested context,
-   * the byte size of the encoding plus the encoded length prefix.
-   */
-  @Override
-  protected long getEncodedElementByteSize(String value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null String");
-    }
-    if (context.isWholeStream) {
-      return Utf8.encodedLength(value);
-    } else {
-      CountingOutputStream countingStream =
-          new CountingOutputStream(ByteStreams.nullOutputStream());
-      DataOutputStream stream = new DataOutputStream(countingStream);
-      writeString(value, stream);
-      return countingStream.getCount();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
deleted file mode 100644
index ea18eb9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/StructuralByteArray.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.coders;
-
-import static com.google.api.client.util.Base64.encodeBase64String;
-
-import java.util.Arrays;
-
-/**
- * A wrapper around a byte[] that uses structural, value-based
- * equality rather than byte[]'s normal object identity.
- */
-public class StructuralByteArray {
-  byte[] value;
-
-  public StructuralByteArray(byte[] value) {
-    this.value = value;
-  }
-
-  public byte[] getValue() {
-    return value;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof StructuralByteArray) {
-      StructuralByteArray that = (StructuralByteArray) o;
-      return Arrays.equals(this.value, that.value);
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(value);
-  }
-
-  @Override
-  public String toString() {
-    return "base64:" + encodeBase64String(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
deleted file mode 100644
index bed88b0..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TableRowJsonCoder.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.api.services.bigquery.model.TableRow;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
- */
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
-
-  @JsonCreator
-  public static TableRowJsonCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(TableRow value, OutputStream outStream, Context context)
-      throws IOException {
-    String strValue = MAPPER.writeValueAsString(value);
-    StringUtf8Coder.of().encode(strValue, outStream, context);
-  }
-
-  @Override
-  public TableRow decode(InputStream inStream, Context context)
-      throws IOException {
-    String strValue = StringUtf8Coder.of().decode(inStream, context);
-    return MAPPER.readValue(strValue, TableRow.class);
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(TableRow value, Context context)
-      throws Exception {
-    String strValue = MAPPER.writeValueAsString(value);
-    return StringUtf8Coder.of().getEncodedElementByteSize(strValue, context);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
-  // TableRow.
-  private static final ObjectMapper MAPPER =
-      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
-
-  private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
-
-  private TableRowJsonCoder() { }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @throws NonDeterministicException always. A {@link TableRow} can hold arbitrary
-   *         {@link Object} instances, which makes the encoding non-deterministic.
-   */
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    throw new NonDeterministicException(this,
-        "TableCell can hold arbitrary instances, which may be non-deterministic.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
deleted file mode 100644
index 9250c68..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/TextualIntegerCoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
- * their textual, decimal, representation.
- */
-public class TextualIntegerCoder extends AtomicCoder<Integer> {
-
-  @JsonCreator
-  public static TextualIntegerCoder of() {
-    return new TextualIntegerCoder();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  protected TextualIntegerCoder() {}
-
-  @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    String textualValue = value.toString();
-    StringUtf8Coder.of().encode(textualValue, outStream, context);
-  }
-
-  @Override
-  public Integer decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    String textualValue = StringUtf8Coder.of().decode(inStream, context);
-    try {
-      return Integer.valueOf(textualValue);
-    } catch (NumberFormatException exn) {
-      throw new CoderException("error when decoding a textual integer", exn);
-    }
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    String textualValue = value.toString();
-    return StringUtf8Coder.of().getEncodedElementByteSize(textualValue, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
deleted file mode 100644
index 18ec250..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarIntCoder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative
- * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
- * integers that are known to often be large or negative.
- */
-public class VarIntCoder extends AtomicCoder<Integer> {
-
-  @JsonCreator
-  public static VarIntCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final VarIntCoder INSTANCE =
-      new VarIntCoder();
-
-  private VarIntCoder() {}
-
-  @Override
-  public void encode(Integer value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    VarInt.encode(value.intValue(), outStream);
-  }
-
-  @Override
-  public Integer decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      return VarInt.decodeInt(inStream);
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link VarIntCoder} is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Integer value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Integer value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Integer");
-    }
-    return VarInt.getLength(value.longValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
deleted file mode 100644
index 520245e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/VarLongCoder.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.coders;
-
-import com.google.cloud.dataflow.sdk.util.VarInt;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UTFDataFormatException;
-
-/**
- * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative
- * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
- * longs that are known to often be large or negative.
- */
-public class VarLongCoder extends AtomicCoder<Long> {
-
-  @JsonCreator
-  public static VarLongCoder of() {
-    return INSTANCE;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final VarLongCoder INSTANCE = new VarLongCoder();
-
-  private VarLongCoder() {}
-
-  @Override
-  public void encode(Long value, OutputStream outStream, Context context)
-      throws IOException, CoderException {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Long");
-    }
-    VarInt.encode(value.longValue(), outStream);
-  }
-
-  @Override
-  public Long decode(InputStream inStream, Context context)
-      throws IOException, CoderException {
-    try {
-      return VarInt.decodeLong(inStream);
-    } catch (EOFException | UTFDataFormatException exn) {
-      // These exceptions correspond to decoding problems, so change
-      // what kind of exception they're branded as.
-      throw new CoderException(exn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link VarLongCoder} is injective.
-   */
-  @Override
-  public boolean consistentWithEquals() {
-    return true;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@code true}. {@link #getEncodedElementByteSize} is cheap.
-   */
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(Long value, Context context) {
-    return true;
-  }
-
-  @Override
-  protected long getEncodedElementByteSize(Long value, Context context)
-      throws Exception {
-    if (value == null) {
-      throw new CoderException("cannot encode a null Long");
-    }
-    return VarInt.getLength(value.longValue());
-  }
-}


Mime
View raw message