beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [3/4] beam git commit: [BEAM-1871] Move Xml IO and related classes to new sdks/java/io/xml package.
Date Fri, 21 Apr 2017 23:38:22 GMT
[BEAM-1871] Move Xml IO and related classes to new sdks/java/io/xml package.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/393a90c7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/393a90c7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/393a90c7

Branch: refs/heads/master
Commit: 393a90c74a86d7484d047316a5ccb22cd360a4d0
Parents: 022d5b6
Author: Luke Cwik <lcwik@google.com>
Authored: Fri Apr 21 15:45:04 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Fri Apr 21 16:37:44 2017 -0700

----------------------------------------------------------------------
 sdks/java/core/pom.xml                          |  31 +-
 .../org/apache/beam/sdk/coders/JAXBCoder.java   | 201 -----
 .../beam/sdk/coders/StringDelegateCoder.java    |   3 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java |   2 +-
 .../main/java/org/apache/beam/sdk/io/XmlIO.java | 476 ----------
 .../java/org/apache/beam/sdk/io/XmlSink.java    | 153 ----
 .../java/org/apache/beam/sdk/io/XmlSource.java  | 404 ---------
 .../beam/sdk/testing/SourceTestUtils.java       |   2 +-
 .../apache/beam/sdk/coders/JAXBCoderTest.java   | 223 -----
 .../org/apache/beam/sdk/io/XmlSinkTest.java     | 253 ------
 .../org/apache/beam/sdk/io/XmlSourceTest.java   | 892 ------------------
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/xml/pom.xml                        | 118 +++
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 203 +++++
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  | 469 ++++++++++
 .../org/apache/beam/sdk/io/xml/XmlSink.java     | 160 ++++
 .../org/apache/beam/sdk/io/xml/XmlSource.java   | 404 +++++++++
 .../apache/beam/sdk/io/xml/package-info.java    |  22 +
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   | 228 +++++
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 253 ++++++
 .../apache/beam/sdk/io/xml/XmlSourceTest.java   | 893 +++++++++++++++++++
 21 files changed, 2755 insertions(+), 2636 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 7af1444..ac7a3bb 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -232,36 +232,7 @@
       <artifactId>joda-time</artifactId>
     </dependency>
 
-    <!-- 
-    To use org.apache.beam.io.XmlSource: 
-
-    1. Explicitly declare the following dependency for the stax2 API.
-    2. Include a stax2 implementation on the classpath. One example
-       is given below as an optional runtime dependency on woodstox-core-asl
-    -->
-    <dependency>
-      <groupId>org.codehaus.woodstox</groupId>
-      <artifactId>stax2-api</artifactId>
-      <version>${stax2.version}</version>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>org.codehaus.woodstox</groupId>
-      <artifactId>woodstox-core-asl</artifactId>
-      <version>${woodstox.version}</version>
-      <scope>runtime</scope>
-      <optional>true</optional>
-      <exclusions>
-        <!-- javax.xml.stream:stax-api is included in JDK 1.6+ -->
-        <exclusion>
-          <groupId>javax.xml.stream</groupId>
-          <artifactId>stax-api</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly 
+    <!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly
       declare this dependency to include org.tukaani:xz on the classpath at runtime. -->
     <dependency>
       <groupId>org.tukaani</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
deleted file mode 100644
index ea636fc..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.io.ByteStreams;
-import java.io.FilterInputStream;
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
-import org.apache.beam.sdk.util.Structs;
-import org.apache.beam.sdk.util.VarInt;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms
- * to encode/decode the objects. Users must provide the {@code Class} of the JAXB annotated object.
- *
- * @param <T> type of JAXB annotated objects that will be serialized.
- */
-public class JAXBCoder<T> extends AtomicCoder<T> {
-
-  private final Class<T> jaxbClass;
-  private final TypeDescriptor<T> typeDescriptor;
-  private transient volatile JAXBContext jaxbContext;
-  private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
-  private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
-
-  public Class<T> getJAXBClass() {
-    return jaxbClass;
-  }
-
-  private JAXBCoder(Class<T> jaxbClass) {
-    this.jaxbClass = jaxbClass;
-    this.typeDescriptor = TypeDescriptor.of(jaxbClass);
-    this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
-      @Override
-      protected Marshaller initialValue() {
-        try {
-          JAXBContext jaxbContext = getContext();
-          return jaxbContext.createMarshaller();
-        } catch (JAXBException e) {
-          throw new RuntimeException("Error when creating marshaller from JAXB Context.", e);
-        }
-      }
-    };
-    this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() {
-      @Override
-      protected Unmarshaller initialValue() {
-        try {
-          JAXBContext jaxbContext = getContext();
-          return jaxbContext.createUnmarshaller();
-        } catch (Exception e) {
-          throw new RuntimeException("Error when creating unmarshaller from JAXB Context.", e);
-        }
-      }
-    };
-  }
-
-  /**
-   * Create a coder for a given type of JAXB annotated objects.
-   *
-   * @param jaxbClass the {@code Class} of the JAXB annotated objects.
-   */
-  public static <T> JAXBCoder<T> of(Class<T> jaxbClass) {
-    return new JAXBCoder<>(jaxbClass);
-  }
-
-  @Override
-  public void encode(T value, OutputStream outStream, Context context)
-      throws CoderException, IOException {
-    try {
-      if (!context.isWholeStream) {
-        try {
-          long size = getEncodedElementByteSize(value, Context.OUTER);
-          // record the number of bytes the XML consists of so when reading we only read the encoded
-          // value
-          VarInt.encode(size, outStream);
-        } catch (Exception e) {
-          throw new CoderException(
-              "An Exception occured while trying to get the size of an encoded representation", e);
-        }
-      }
-
-      jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  @Override
-  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
-    try {
-      InputStream stream = inStream;
-      if (!context.isWholeStream) {
-        long limit = VarInt.decodeLong(inStream);
-        stream = ByteStreams.limit(inStream, limit);
-      }
-      @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
-      return obj;
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-  }
-
-  private JAXBContext getContext() throws JAXBException {
-    if (jaxbContext == null) {
-      synchronized (this) {
-        if (jaxbContext == null) {
-          jaxbContext = JAXBContext.newInstance(jaxbClass);
-        }
-      }
-    }
-    return jaxbContext;
-  }
-
-  @Override
-  public String getEncodingId() {
-    return getJAXBClass().getName();
-  }
-
-  @Override
-  public TypeDescriptor<T> getEncodedTypeDescriptor() {
-    return typeDescriptor;
-  }
-
-  private static class CloseIgnoringInputStream extends FilterInputStream {
-
-    protected CloseIgnoringInputStream(InputStream in) {
-      super(in);
-    }
-
-    @Override
-    public void close() {
-      // Do nothing. JAXB closes the underlying stream so we must filter out those calls.
-    }
-  }
-
-  private static class CloseIgnoringOutputStream extends FilterOutputStream {
-
-    protected CloseIgnoringOutputStream(OutputStream out) {
-      super(out);
-    }
-
-    @Override
-    public void close() throws IOException {
-      // JAXB closes the underlying stream so we must filter out those calls.
-    }
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  // JSON Serialization details below
-
-  private static final String JAXB_CLASS = "jaxb_class";
-
-  /**
-   * Constructor for JSON deserialization only.
-   */
-  @JsonCreator
-  public static <T> JAXBCoder<T> of(
-      @JsonProperty(JAXB_CLASS) String jaxbClassName) {
-    try {
-      @SuppressWarnings("unchecked")
-      Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
-      return of(jaxbClass);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalArgumentException(e);
-    }
-  }
-
-  @Override
-  protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index d4b4ae8..f86369c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -43,8 +43,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * <p>This method of encoding is not designed for ease of evolution of {@code Clazz};
  * it should only be used in cases where the class is stable or the encoding is not
- * important. If evolution of the class is important, see {@link ProtoCoder}, {@link AvroCoder},
- * or {@link JAXBCoder}.
+ * important. If evolution of the class is important, see {@link ProtoCoder} or {@link AvroCoder}.
  *
  * @param <T> The type of objects coded.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index b2a4075..20fc4d9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
  * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
  * methods to create a sub-source and a reader for a range of a single file -
  * {@link #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to
- * {@link XmlSource} for an example implementation of {@code FileBasedSource}.
+ * {@link TextIO TextIO.TextSource} for an example implementation of {@code FileBasedSource}.
  *
  * @param <T> Type of records represented by the source.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
deleted file mode 100644
index 6ced5d4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlIO.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import javax.annotation.Nullable;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/** Transforms for reading and writing XML files using JAXB mappers. */
-public class XmlIO {
-  // CHECKSTYLE.OFF: JavadocStyle
-  /**
-   * Reads XML files. This source reads one or more XML files and creates a {@link PCollection} of a
-   * given type. Please note the example given below.
-   *
-   * <p>The XML file must be of the following form, where {@code root} and {@code record} are XML
-   * element names that are defined by the user:
-   *
-   * <pre>{@code
-   * <root>
-   * <record> ... </record>
-   * <record> ... </record>
-   * <record> ... </record>
-   * ...
-   * <record> ... </record>
-   * </root>
-   * }</pre>
-   *
-   * <p>Basically, the XML document should contain a single root element with an inner list
-   * consisting entirely of record elements. The records may contain arbitrary XML content; however,
-   * that content <b>must not</b> contain the start {@code <record>} or end {@code </record>} tags.
-   * This restriction enables reading from large XML files in parallel from different offsets in the
-   * file.
-   *
-   * <p>Root and/or record elements may additionally contain an arbitrary number of XML attributes.
-   * Additionally users must provide a class of a JAXB annotated Java type that can be used convert
-   * records into Java objects and vice versa using JAXB marshalling/unmarshalling mechanisms.
-   * Reading the source will generate a {@code PCollection} of the given JAXB annotated Java type.
-   * Optionally users may provide a minimum size of a bundle that should be created for the source.
-   *
-   * <p>The following example shows how to use this method in a Beam pipeline:
-   *
-   * <pre>{@code
-   * PCollection<String> output = p.apply(XmlIO.<Record>read()
-   *     .from(file.toPath().toString())
-   *     .withRootElement("root")
-   *     .withRecordElement("record")
-   *     .withRecordClass(Record.class));
-   * }</pre>
-   *
-   * <p>Currently, only XML files that use single-byte characters are supported. Using a file that
-   * contains multi-byte characters may result in data loss or duplication.
-   *
-   * <p>To use this method:
-   *
-   * <ol>
-   * <li>Explicitly declare a dependency on org.codehaus.woodstox:stax2-api
-   * <li>Include a compatible implementation on the classpath at run-time, such as
-   *     org.codehaus.woodstox:woodstox-core-asl
-   * </ol>
-   *
-   * <p>These dependencies have been declared as optional in the sdks/java/core/pom.xml file of
-   * Apache Beam.
-   *
-   * <h3>Permissions</h3>
-   *
-   * <p>Permission requirements depend on the {@link org.apache.beam.sdk.runners.PipelineRunner
-   * PipelineRunner} that is used to execute the Beam pipeline. Please refer to the documentation of
-   * corresponding {@link PipelineRunner PipelineRunners} for more details.
-   *
-   * @param <T> Type of the objects that represent the records of the XML file. The {@code
-   *     PCollection} generated by this source will be of this type.
-   */
-  // CHECKSTYLE.ON: JavadocStyle
-  public static <T> Read<T> read() {
-    return new AutoValue_XmlIO_Read.Builder<T>()
-        .setMinBundleSize(Read.DEFAULT_MIN_BUNDLE_SIZE)
-        .setCompressionType(Read.CompressionType.AUTO)
-        .build();
-  }
-
-  // CHECKSTYLE.OFF: JavadocStyle
-  /**
-   * A {@link FileBasedSink} that outputs records as XML-formatted elements. Writes a {@link
-   * PCollection} of records from JAXB-annotated classes to a single file location.
-   *
-   * <p>Given a PCollection containing records of type T that can be marshalled to XML elements,
-   * this Sink will produce a single file consisting of a single root element that contains all of
-   * the elements in the PCollection.
-   *
-   * <p>XML Sinks are created with a base filename to write to, a root element name that will be
-   * used for the root element of the output files, and a class to bind to an XML element. This
-   * class will be used in the marshalling of records in an input PCollection to their XML
-   * representation and must be able to be bound using JAXB annotations (checked at pipeline
-   * construction time).
-   *
-   * <p>XML Sinks can be written to using the {@link Write} transform:
-   *
-   * <pre>{@code
-   * p.apply(XmlIO.<Type>write()
-   *      .withRecordClass(Type.class)
-   *      .withRootElement(root_element)
-   *      .toFilenamePrefix(output_filename));
-   * }</pre>
-   *
-   * <p>For example, consider the following class with JAXB annotations:
-   *
-   * <pre>
-   *  {@literal @}XmlRootElement(name = "word_count_result")
-   *  {@literal @}XmlType(propOrder = {"word", "frequency"})
-   *  public class WordFrequency {
-   *    private String word;
-   *    private long frequency;
-   *
-   *    public WordFrequency() { }
-   *
-   *    public WordFrequency(String word, long frequency) {
-   *      this.word = word;
-   *      this.frequency = frequency;
-   *    }
-   *
-   *    public void setWord(String word) {
-   *      this.word = word;
-   *    }
-   *
-   *    public void setFrequency(long frequency) {
-   *      this.frequency = frequency;
-   *    }
-   *
-   *    public long getFrequency() {
-   *      return frequency;
-   *    }
-   *
-   *    public String getWord() {
-   *      return word;
-   *    }
-   *  }
-   * </pre>
-   *
-   * <p>The following will produce XML output with a root element named "words" from a PCollection
-   * of WordFrequency objects:
-   *
-   * <pre>{@code
-   * p.apply(XmlIO.<WordFrequency>write()
-   *     .withRecordClass(WordFrequency.class)
-   *     .withRootElement("words")
-   *     .toFilenamePrefix(output_file));
-   * }</pre>
-   *
-   * <p>The output of which will look like:
-   *
-   * <pre>{@code
-   * <words>
-   *
-   *  <word_count_result>
-   *    <word>decreased</word>
-   *    <frequency>1</frequency>
-   *  </word_count_result>
-   *
-   *  <word_count_result>
-   *    <word>War</word>
-   *    <frequency>4</frequency>
-   *  </word_count_result>
-   *
-   *  <word_count_result>
-   *    <word>empress'</word>
-   *    <frequency>14</frequency>
-   *  </word_count_result>
-   *
-   *  <word_count_result>
-   *    <word>stoops</word>
-   *    <frequency>6</frequency>
-   *  </word_count_result>
-   *
-   *  ...
-   * </words>
-   * }</pre>
-   */
-  // CHECKSTYLE.ON: JavadocStyle
-  public static <T> Write<T> write() {
-    return new AutoValue_XmlIO_Write.Builder<T>().build();
-  }
-
-  /** Implementation of {@link #read}. */
-  @AutoValue
-  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-    private static final int DEFAULT_MIN_BUNDLE_SIZE = 8 * 1024;
-
-    @Nullable
-    abstract String getFileOrPatternSpec();
-
-    @Nullable
-    abstract String getRootElement();
-
-    @Nullable
-    abstract String getRecordElement();
-
-    @Nullable
-    abstract Class<T> getRecordClass();
-
-    abstract CompressionType getCompressionType();
-
-    abstract long getMinBundleSize();
-
-    abstract Builder<T> toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
-
-      abstract Builder<T> setRootElement(String rootElement);
-
-      abstract Builder<T> setRecordElement(String recordElement);
-
-      abstract Builder<T> setRecordClass(Class<T> recordClass);
-
-      abstract Builder<T> setMinBundleSize(long minBundleSize);
-
-      abstract Builder<T> setCompressionType(CompressionType compressionType);
-
-      abstract Read<T> build();
-    }
-
-    /** Strategy for determining the compression type of XML files being read. */
-    public enum CompressionType {
-      /** Automatically determine the compression type based on filename extension. */
-      AUTO(""),
-      /** Uncompressed (i.e., may be split). */
-      UNCOMPRESSED(""),
-      /** GZipped. */
-      GZIP(".gz"),
-      /** BZipped. */
-      BZIP2(".bz2"),
-      /** Zipped. */
-      ZIP(".zip"),
-      /** Deflate compressed. */
-      DEFLATE(".deflate");
-
-      private String filenameSuffix;
-
-      CompressionType(String suffix) {
-        this.filenameSuffix = suffix;
-      }
-
-      /**
-       * Determine if a given filename matches a compression type based on its extension.
-       *
-       * @param filename the filename to match
-       * @return true iff the filename ends with the compression type's known extension.
-       */
-      public boolean matches(String filename) {
-        return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
-      }
-    }
-
-    /**
-     * Reads a single XML file or a set of XML files defined by a Java "glob" file pattern. Each XML
-     * file should be of the form defined in {@link #read}.
-     */
-    public Read<T> from(String fileOrPatternSpec) {
-      return toBuilder().setFileOrPatternSpec(fileOrPatternSpec).build();
-    }
-
-    /**
-     * Sets name of the root element of the XML document. This will be used to create a valid
-     * starting root element when initiating a bundle of records created from an XML document. This
-     * is a required parameter.
-     */
-    public Read<T> withRootElement(String rootElement) {
-      return toBuilder().setRootElement(rootElement).build();
-    }
-
-    /**
-     * Sets name of the record element of the XML document. This will be used to determine offset of
-     * the first record of a bundle created from the XML document. This is a required parameter.
-     */
-    public Read<T> withRecordElement(String recordElement) {
-      return toBuilder().setRecordElement(recordElement).build();
-    }
-
-    /**
-     * Sets a JAXB annotated class that can be populated using a record of the provided XML file.
-     * This will be used when unmarshalling record objects from the XML file. This is a required
-     * parameter.
-     */
-    public Read<T> withRecordClass(Class<T> recordClass) {
-      return toBuilder().setRecordClass(recordClass).build();
-    }
-
-    /**
-     * Sets a parameter {@code minBundleSize} for the minimum bundle size of the source. Please
-     * refer to {@link OffsetBasedSource} for the definition of minBundleSize. This is an optional
-     * parameter.
-     */
-    public Read<T> withMinBundleSize(long minBundleSize) {
-      return toBuilder().setMinBundleSize(minBundleSize).build();
-    }
-
-    /**
-     * Decompresses all input files using the specified compression type.
-     *
-     * <p>If no compression type is specified, the default is {@link CompressionType#AUTO}. In this
-     * mode, the compression type of the file is determined by its extension. Supports .gz, .bz2,
-     * .zip and .deflate compression.
-     */
-    public Read<T> withCompressionType(CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkNotNull(
-          getRootElement(),
-          "rootElement is null. Use builder method withRootElement() to set this.");
-      checkNotNull(
-          getRecordElement(),
-          "recordElement is null. Use builder method withRecordElement() to set this.");
-      checkNotNull(
-          getRecordClass(),
-          "recordClass is null. Use builder method withRecordClass() to set this.");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder
-          .addIfNotDefault(
-              DisplayData.item("minBundleSize", getMinBundleSize())
-                  .withLabel("Minimum Bundle Size"),
-              1L)
-          .add(DisplayData.item("filePattern", getFileOrPatternSpec()).withLabel("File Pattern"))
-          .addIfNotNull(
-              DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))
-          .addIfNotNull(
-              DisplayData.item("recordElement", getRecordElement()).withLabel("XML Record Element"))
-          .addIfNotNull(
-              DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"));
-    }
-
-    @VisibleForTesting
-    BoundedSource<T> createSource() {
-      XmlSource<T> source = new XmlSource<>(this);
-      switch (getCompressionType()) {
-        case UNCOMPRESSED:
-          return source;
-        case AUTO:
-          return CompressedSource.from(source);
-        case BZIP2:
-          return CompressedSource.from(source)
-              .withDecompression(CompressedSource.CompressionMode.BZIP2);
-        case GZIP:
-          return CompressedSource.from(source)
-              .withDecompression(CompressedSource.CompressionMode.GZIP);
-        case ZIP:
-          return CompressedSource.from(source)
-              .withDecompression(CompressedSource.CompressionMode.ZIP);
-        case DEFLATE:
-          return CompressedSource.from(source)
-              .withDecompression(CompressedSource.CompressionMode.DEFLATE);
-        default:
-          throw new IllegalArgumentException("Unknown compression type: " + getCompressionType());
-      }
-    }
-
-    @Override
-    public PCollection<T> expand(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(createSource()));
-    }
-  }
-
-  /** Implementation of {@link #write}. */
-  @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
-    @Nullable
-    abstract String getFilenamePrefix();
-
-    @Nullable
-    abstract Class<T> getRecordClass();
-
-    @Nullable
-    abstract String getRootElement();
-
-    abstract Builder<T> toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
-
-      abstract Builder<T> setRecordClass(Class<T> recordClass);
-
-      abstract Builder<T> setRootElement(String rootElement);
-
-      abstract Write<T> build();
-    }
-
-    /**
-     * Writes to files with the given path prefix.
-     *
-     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
-     * the number of output bundles.
-     */
-    public Write<T> toFilenamePrefix(String filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
-    }
-
-    /**
-     * Writes objects of the given class mapped to XML elements using JAXB.
-     *
-     * <p>The specified class must be able to be used to create a JAXB context.
-     */
-    public Write<T> withRecordClass(Class<T> recordClass) {
-      return toBuilder().setRecordClass(recordClass).build();
-    }
-
-    /** Sets the enclosing root element for the generated XML files. */
-    public Write<T> withRootElement(String rootElement) {
-      return toBuilder().setRootElement(rootElement).build();
-    }
-
-    @Override
-    public void validate(PCollection<T> input) {
-      checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
-      checkNotNull(getRootElement(), "Missing a root element name.");
-      checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");
-      try {
-        JAXBContext.newInstance(getRecordClass());
-      } catch (JAXBException e) {
-        throw new RuntimeException("Error binding classes to a JAXB Context.", e);
-      }
-    }
-
-    @Override
-    public PDone expand(PCollection<T> input) {
-      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
-    }
-
-    @VisibleForTesting
-    XmlSink<T> createSink() {
-      return new XmlSink<>(this);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      createSink().populateFileBasedDisplayData(builder);
-      builder
-          .addIfNotNull(
-              DisplayData.item("rootElement", getRootElement()).withLabel("XML Root Element"))
-          .addIfNotNull(
-              DisplayData.item("recordClass", getRecordClass()).withLabel("XML Record Class"));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
deleted file mode 100644
index b890908..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Marshaller;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
-
-/** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T> {
-  protected static final String XML_EXTENSION = "xml";
-
-  private final XmlIO.Write<T> spec;
-
-  XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), XML_EXTENSION);
-    this.spec = spec;
-  }
-
-  /**
-   * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
-   * been set and that the class can be bound in a JAXB context.
-   */
-  @Override
-  public void validate(PipelineOptions options) {
-    spec.validate(null);
-  }
-
-  /**
-   * Creates an {@link XmlWriteOperation}.
-   */
-  @Override
-  public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
-    return new XmlWriteOperation<>(this);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    spec.populateDisplayData(builder);
-  }
-
-  void populateFileBasedDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-  }
-
-  /**
-   * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s.
-   */
-  protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
-    public XmlWriteOperation(XmlSink<T> sink) {
-      super(sink);
-    }
-
-    /**
-     * Creates a {@link XmlWriter} with a marshaller for the type it will write.
-     */
-    @Override
-    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
-      JAXBContext context;
-      Marshaller marshaller;
-      context = JAXBContext.newInstance(getSink().spec.getRecordClass());
-      marshaller = context.createMarshaller();
-      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
-      return new XmlWriter<>(this, marshaller);
-    }
-
-    /**
-     * Return the XmlSink.Bound for this write operation.
-     */
-    @Override
-    public XmlSink<T> getSink() {
-      return (XmlSink<T>) super.getSink();
-    }
-  }
-
-  /**
-   * A {@link FileBasedWriter} that can write objects as XML elements.
-   */
-  protected static final class XmlWriter<T> extends FileBasedWriter<T> {
-    final Marshaller marshaller;
-    private OutputStream os = null;
-
-    public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
-      super(writeOperation);
-      this.marshaller = marshaller;
-    }
-
-    /**
-     * Creates the output stream that elements will be written to.
-     */
-    @Override
-    protected void prepareWrite(WritableByteChannel channel) throws Exception {
-      os = Channels.newOutputStream(channel);
-    }
-
-    /**
-     * Writes the root element opening tag.
-     */
-    @Override
-    protected void writeHeader() throws Exception {
-      String rootElementName = getWriteOperation().getSink().spec.getRootElement();
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
-    }
-
-    /**
-     * Writes the root element closing tag.
-     */
-    @Override
-    protected void writeFooter() throws Exception {
-      String rootElementName = getWriteOperation().getSink().spec.getRootElement();
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
-    }
-
-    /**
-     * Writes a value to the stream.
-     */
-    @Override
-    public void write(T value) throws Exception {
-      marshaller.marshal(value, os);
-    }
-
-    /**
-     * Return the XmlWriteOperation this write belongs to.
-     */
-    @Override
-    public XmlWriteOperation<T> getWriteOperation() {
-      return (XmlWriteOperation<T>) super.getWriteOperation();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
deleted file mode 100644
index 4b7d3b4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSource.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.SequenceInputStream;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.ValidationEvent;
-import javax.xml.bind.ValidationEventHandler;
-import javax.xml.stream.FactoryConfigurationError;
-import javax.xml.stream.XMLInputFactory;
-import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.JAXBCoder;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.codehaus.stax2.XMLInputFactory2;
-
-/** Implementation of {@link XmlIO#read}. */
-public class XmlSource<T> extends FileBasedSource<T> {
-
-  private static final String XML_VERSION = "1.1";
-
-  private final XmlIO.Read<T> spec;
-
-  XmlSource(XmlIO.Read<T> spec) {
-    super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize());
-    this.spec = spec;
-  }
-
-  private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) {
-    super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
-    this.spec = spec;
-  }
-
-  @Override
-  protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
-    return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end);
-  }
-
-  @Override
-  protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-    return new XMLReader<T>(this);
-  }
-
-  @Override
-  public void validate() {
-    super.validate();
-    spec.validate(null);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    spec.populateDisplayData(builder);
-  }
-
-  @Override
-  public Coder<T> getDefaultOutputCoder() {
-    return JAXBCoder.of(spec.getRecordClass());
-  }
-
-  /**
-   * A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
-   * file should be of the form defined at {@link XmlSource}.
-   *
-   * <p>Timestamped values are currently unsupported - all values implicitly have the timestamp
-   * of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-   *
-   * @param <T> Type of objects that will be read by the reader.
-   */
-  private static class XMLReader<T> extends FileBasedReader<T> {
-    // The amount of bytes read from the channel to memory when determining the starting offset of
-    // the first record in a bundle. After matching to starting offset of the first record the
-    // remaining bytes read to this buffer and the bytes still not read from the channel are used to
-    // create the XML parser.
-    private static final int BUF_SIZE = 1024;
-
-    // This should be the maximum number of bytes a character will encode to, for any encoding
-    // supported by XmlSource. Currently this is set to 4 since UTF-8 characters may be
-    // four bytes.
-    private static final int MAX_CHAR_BYTES = 4;
-
-    // In order to support reading starting in the middle of an XML file, we construct an imaginary
-    // well-formed document (a header and root tag followed by the contents of the input starting at
-    // the record boundary) and feed it to the parser. Because of this, the offset reported by the
-    // XML parser is not the same as offset in the original file. They differ by a constant amount:
-    // offsetInOriginalFile = parser.getLocation().getCharacterOffset() + parserBaseOffset;
-    // Note that this is true only for files with single-byte characters.
-    // It appears that, as of writing, there does not exist a Java XML parser capable of correctly
-    // reporting byte offsets of elements in the presence of multi-byte characters.
-    private long parserBaseOffset = 0;
-    private boolean readingStarted = false;
-
-    // If true, the current bundle does not contain any records.
-    private boolean emptyBundle = false;
-
-    private Unmarshaller jaxbUnmarshaller = null;
-    private XMLStreamReader parser = null;
-
-    private T currentRecord = null;
-
-    // Byte offset of the current record in the XML file provided when creating the source.
-    private long currentByteOffset = 0;
-
-    public XMLReader(XmlSource<T> source) {
-      super(source);
-
-      // Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
-      try {
-        JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
-        jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
-        // Throw errors if validation fails. JAXB by default ignores validation errors.
-        jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
-          @Override
-          public boolean handleEvent(ValidationEvent event) {
-            throw new RuntimeException(event.getMessage(), event.getLinkedException());
-          }
-        });
-      } catch (JAXBException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public synchronized XmlSource<T> getCurrentSource() {
-      return (XmlSource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected void startReading(ReadableByteChannel channel) throws IOException {
-      // This method determines the correct starting offset of the first record by reading bytes
-      // from the ReadableByteChannel. This implementation does not need the channel to be a
-      // SeekableByteChannel.
-      // The method tries to determine the first record element in the byte channel. The first
-      // record must start with the characters "<recordElement" where "recordElement" is the
-      // record element of the XML document described above. For the match to be complete this
-      // has to be followed by one of following.
-      // * any whitespace character
-      // * '>' character
-      // * '/' character (to support empty records).
-      //
-      // After this match this method creates the XML parser for parsing the XML document,
-      // feeding it a fake document consisting of an XML header and the <rootElement> tag followed
-      // by the contents of channel starting from <recordElement. The <rootElement> tag may be never
-      // closed.
-
-      // This stores any bytes that should be used prior to the remaining bytes of the channel when
-      // creating an XML parser object.
-      ByteArrayOutputStream preambleByteBuffer = new ByteArrayOutputStream();
-      // A dummy declaration and root for the document with proper XML version and encoding. Without
-      // this XML parsing may fail or may produce incorrect results.
-
-      byte[] dummyStartDocumentBytes =
-          (String.format(
-                  "<?xml version=\"%s\" encoding=\"UTF-8\" ?><%s>",
-                  XML_VERSION, getCurrentSource().spec.getRootElement()))
-              .getBytes(StandardCharsets.UTF_8);
-      preambleByteBuffer.write(dummyStartDocumentBytes);
-      // Gets the byte offset (in the input file) of the first record in ReadableByteChannel. This
-      // method returns the offset and stores any bytes that should be used when creating the XML
-      // parser in preambleByteBuffer.
-      long offsetInFileOfRecordElement =
-          getFirstOccurenceOfRecordElement(channel, preambleByteBuffer);
-      if (offsetInFileOfRecordElement < 0) {
-        // Bundle has no records. So marking this bundle as an empty bundle.
-        emptyBundle = true;
-        return;
-      } else {
-        byte[] preambleBytes = preambleByteBuffer.toByteArray();
-        currentByteOffset = offsetInFileOfRecordElement;
-        setUpXMLParser(channel, preambleBytes);
-        parserBaseOffset = offsetInFileOfRecordElement - dummyStartDocumentBytes.length;
-      }
-      readingStarted = true;
-    }
-
-    // Gets the first occurrence of the next record within the given ReadableByteChannel. Puts
-    // any bytes read past the starting offset of the next record back to the preambleByteBuffer.
-    // If a record is found, returns the starting offset of the record, otherwise
-    // returns -1.
-    private long getFirstOccurenceOfRecordElement(
-        ReadableByteChannel channel, ByteArrayOutputStream preambleByteBuffer) throws IOException {
-      int byteIndexInRecordElementToMatch = 0;
-      // Index of the byte in the string "<recordElement" to be matched
-      // against the current byte from the stream.
-      boolean recordStartBytesMatched = false; // "<recordElement" matched. Still have to match the
-      // next character to confirm if this is a positive match.
-      boolean fullyMatched = false; // If true, record element was fully matched.
-
-      // This gives the offset of the byte currently being read. We do a '-1' here since we
-      // increment this value at the beginning of the while loop below.
-      long offsetInFileOfCurrentByte = getCurrentSource().getStartOffset() - 1;
-      long startingOffsetInFileOfCurrentMatch = -1;
-      // If this is non-negative, currently there is a match in progress and this value gives the
-      // starting offset of the match currently being conducted.
-      boolean matchStarted = false; // If true, a match is currently in progress.
-
-      // These two values are used to determine the character immediately following a match for
-      // "<recordElement". Please see the comment for 'MAX_CHAR_BYTES' above.
-      byte[] charBytes = new byte[MAX_CHAR_BYTES];
-      int charBytesFound = 0;
-
-      ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
-      byte[] recordStartBytes =
-          ("<" + getCurrentSource().spec.getRecordElement()).getBytes(StandardCharsets.UTF_8);
-
-      outer: while (channel.read(buf) > 0) {
-        buf.flip();
-        while (buf.hasRemaining()) {
-          offsetInFileOfCurrentByte++;
-          byte b = buf.get();
-          boolean reset = false;
-          if (recordStartBytesMatched) {
-            // We already matched "<recordElement" reading the next character to determine if this
-            // is a positive match for a new record.
-            charBytes[charBytesFound] = b;
-            charBytesFound++;
-            Character c = null;
-            if (charBytesFound == charBytes.length) {
-              CharBuffer charBuf = CharBuffer.allocate(1);
-              InputStream charBufStream = new ByteArrayInputStream(charBytes);
-              java.io.Reader reader =
-                  new InputStreamReader(charBufStream, StandardCharsets.UTF_8);
-              int read = reader.read();
-              if (read <= 0) {
-                return -1;
-              }
-              charBuf.flip();
-              c = (char) read;
-            } else {
-              continue;
-            }
-
-            // Record start may be of following forms
-            // * "<recordElement<whitespace>..."
-            // * "<recordElement>..."
-            // * "<recordElement/..."
-            if (Character.isWhitespace(c) || c == '>' || c == '/') {
-              fullyMatched = true;
-              // Add the recordStartBytes and charBytes to preambleByteBuffer since these were
-              // already read from the channel.
-              preambleByteBuffer.write(recordStartBytes);
-              preambleByteBuffer.write(charBytes);
-              // Also add the rest of the current buffer to preambleByteBuffer.
-              while (buf.hasRemaining()) {
-                preambleByteBuffer.write(buf.get());
-              }
-              break outer;
-            } else {
-              // Matching was unsuccessful. Reset the buffer to include bytes read for the char.
-              ByteBuffer newbuf = ByteBuffer.allocate(BUF_SIZE);
-              newbuf.put(charBytes);
-              offsetInFileOfCurrentByte -= charBytes.length;
-              while (buf.hasRemaining()) {
-                newbuf.put(buf.get());
-              }
-              newbuf.flip();
-              buf = newbuf;
-
-              // Ignore everything and try again starting from the current buffer.
-              reset = true;
-            }
-          } else if (b == recordStartBytes[byteIndexInRecordElementToMatch]) {
-            // Next byte matched.
-            if (!matchStarted) {
-              // Match was for the first byte, record the starting offset.
-              matchStarted = true;
-              startingOffsetInFileOfCurrentMatch = offsetInFileOfCurrentByte;
-            }
-            byteIndexInRecordElementToMatch++;
-          } else {
-            // Not a match. Ignore everything and try again starting at current point.
-            reset = true;
-          }
-          if (reset) {
-            // Clear variables and try to match starting from the next byte.
-            byteIndexInRecordElementToMatch = 0;
-            startingOffsetInFileOfCurrentMatch = -1;
-            matchStarted = false;
-            recordStartBytesMatched = false;
-            charBytes = new byte[MAX_CHAR_BYTES];
-            charBytesFound = 0;
-          }
-          if (byteIndexInRecordElementToMatch == recordStartBytes.length) {
-            // "<recordElement" matched. Need to still check next byte since this might be an
-            // element that has "recordElement" as a prefix.
-            recordStartBytesMatched = true;
-          }
-        }
-        buf.clear();
-      }
-
-      if (!fullyMatched) {
-        return -1;
-      } else {
-        return startingOffsetInFileOfCurrentMatch;
-      }
-    }
-
-    private void setUpXMLParser(ReadableByteChannel channel, byte[] lookAhead) throws IOException {
-      try {
-        // We use Woodstox because the StAX implementation provided by OpenJDK reports
-        // character locations incorrectly. Note that Woodstox still currently reports *byte*
-        // locations incorrectly when parsing documents that contain multi-byte characters.
-        XMLInputFactory2 xmlInputFactory = (XMLInputFactory2) XMLInputFactory.newInstance();
-        this.parser = xmlInputFactory.createXMLStreamReader(
-            new SequenceInputStream(
-                new ByteArrayInputStream(lookAhead), Channels.newInputStream(channel)),
-            "UTF-8");
-
-        // Current offset should be the offset before reading the record element.
-        while (true) {
-          int event = parser.next();
-          if (event == XMLStreamConstants.START_ELEMENT) {
-            String localName = parser.getLocalName();
-            if (localName.equals(getCurrentSource().spec.getRecordElement())) {
-              break;
-            }
-          }
-        }
-      } catch (FactoryConfigurationError | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    protected boolean readNextRecord() throws IOException {
-      if (emptyBundle) {
-        currentByteOffset = Long.MAX_VALUE;
-        return false;
-      }
-      try {
-        // Update current offset and check if the next value is the record element.
-        currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-        while (parser.getEventType() != XMLStreamConstants.START_ELEMENT) {
-          parser.next();
-          currentByteOffset = parserBaseOffset + parser.getLocation().getCharacterOffset();
-          if (parser.getEventType() == XMLStreamConstants.END_DOCUMENT) {
-            currentByteOffset = Long.MAX_VALUE;
-            return false;
-          }
-        }
-        JAXBElement<T> jb =
-            jaxbUnmarshaller.unmarshal(parser, getCurrentSource().spec.getRecordClass());
-        currentRecord = jb.getValue();
-        return true;
-      } catch (JAXBException | XMLStreamException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      if (!readingStarted) {
-        throw new NoSuchElementException();
-      }
-      return currentRecord;
-    }
-
-    @Override
-    protected boolean isAtSplitPoint() {
-      // Every record is at a split point.
-      return true;
-    }
-
-    @Override
-    protected long getCurrentOffset() {
-      return currentByteOffset;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index 2ab5b35..fd7ae85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
  * </ul>
  * For example usages, see the unit tests of classes such as
  * {@link org.apache.beam.sdk.io.AvroSource} or
- * {@link org.apache.beam.sdk.io.XmlSource}.
+ * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}.
  *
  * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
deleted file mode 100644
index 55701bf..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link JAXBCoder}. */
-@RunWith(JUnit4.class)
-public class JAXBCoderTest {
-
-  @XmlRootElement
-  static class TestType {
-    private String testString = null;
-    private int testInt;
-
-    public TestType() {}
-
-    public TestType(String testString, int testInt) {
-      this.testString = testString;
-      this.testInt = testInt;
-    }
-
-    public String getTestString() {
-      return testString;
-    }
-
-    public void setTestString(String testString) {
-      this.testString = testString;
-    }
-
-    public int getTestInt() {
-      return testInt;
-    }
-
-    public void setTestInt(int testInt) {
-      this.testInt = testInt;
-    }
-
-    @Override
-    public int hashCode() {
-      int hashCode = 1;
-      hashCode = 31 * hashCode + (testString == null ? 0 : testString.hashCode());
-      hashCode = 31 * hashCode + testInt;
-      return hashCode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof TestType)) {
-        return false;
-      }
-
-      TestType other = (TestType) obj;
-      return (testString == null || testString.equals(other.testString))
-          && (testInt == other.testInt);
-    }
-  }
-
-  @Test
-  public void testEncodeDecodeOuter() throws Exception {
-    JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class);
-
-    byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999));
-    assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded));
-  }
-
-  @Test
-  public void testEncodeDecodeAfterClone() throws Exception {
-    JAXBCoder<TestType> coder = SerializableUtils.clone(JAXBCoder.of(TestType.class));
-
-    byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999));
-    assertEquals(new TestType("abc", 9999), CoderUtils.decodeFromByteArray(coder, encoded));
-  }
-
-  @Test
-  public void testEncodeDecodeNested() throws Exception {
-    JAXBCoder<TestType> jaxbCoder = JAXBCoder.of(TestType.class);
-    TestCoder nesting = new TestCoder(jaxbCoder);
-
-    byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999));
-    assertEquals(
-        new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded));
-  }
-
-  @Test
-  public void testEncodeDecodeMultithreaded() throws Throwable {
-    final JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class);
-    int numThreads = 100;
-
-    final CountDownLatch ready = new CountDownLatch(numThreads);
-    final CountDownLatch start = new CountDownLatch(1);
-    final CountDownLatch done = new CountDownLatch(numThreads);
-
-    final AtomicReference<Throwable> thrown = new AtomicReference<>();
-
-    Executor executor = Executors.newCachedThreadPool();
-    for (int i = 0; i < numThreads; i++) {
-      final TestType elem = new TestType("abc", i);
-      final int index = i;
-      executor.execute(
-          new Runnable() {
-            @Override
-            public void run() {
-              ready.countDown();
-              try {
-                start.await();
-              } catch (InterruptedException e) {
-              }
-
-              try {
-                byte[] encoded = CoderUtils.encodeToByteArray(coder, elem);
-                assertEquals(
-                    new TestType("abc", index), CoderUtils.decodeFromByteArray(coder, encoded));
-              } catch (Throwable e) {
-                thrown.compareAndSet(null, e);
-              }
-              done.countDown();
-            }
-          });
-    }
-    ready.await();
-    start.countDown();
-
-    done.await();
-    Throwable actuallyThrown = thrown.get();
-    if (actuallyThrown != null) {
-      throw actuallyThrown;
-    }
-  }
-
-  /**
-   * A coder that surrounds the value with two values, to demonstrate nesting.
-   */
-  private static class TestCoder extends StandardCoder<TestType> {
-    private final JAXBCoder<TestType> jaxbCoder;
-    public TestCoder(JAXBCoder<TestType> jaxbCoder) {
-      this.jaxbCoder = jaxbCoder;
-    }
-
-    @Override
-    public void encode(TestType value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      VarIntCoder.of().encode(3, outStream, nestedContext);
-      jaxbCoder.encode(value, outStream, nestedContext);
-      VarLongCoder.of().encode(22L, outStream, context);
-    }
-
-    @Override
-    public TestType decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      VarIntCoder.of().decode(inStream, nestedContext);
-      TestType result = jaxbCoder.decode(inStream, nestedContext);
-      VarLongCoder.of().decode(inStream, context);
-      return result;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return ImmutableList.of(jaxbCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      jaxbCoder.verifyDeterministic();
-    }
-  }
-
-  @Test
-  public void testEncodable() throws Exception {
-    CoderProperties.coderSerializable(JAXBCoder.of(TestType.class));
-  }
-
-  @Test
-  public void testEncodingId() throws Exception {
-    Coder<TestType> coder = JAXBCoder.of(TestType.class);
-    CoderProperties.coderHasEncodingId(
-        coder, TestType.class.getName());
-  }
-
-  @Test
-  public void testEncodedTypeDescriptor() throws Exception {
-    assertThat(
-        JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(),
-        equalTo(TypeDescriptor.of(TestType.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/393a90c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
deleted file mode 100644
index 7f559d1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import com.google.common.collect.Lists;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.XmlType;
-import org.apache.beam.sdk.io.XmlSink.XmlWriteOperation;
-import org.apache.beam.sdk.io.XmlSink.XmlWriter;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for XmlSink.
- */
-@RunWith(JUnit4.class)
-public class XmlSinkTest {
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  private String testRootElement = "testElement";
-  private String testFilePrefix = "/path/to/testPrefix";
-
-  /**
-   * An XmlWriter correctly writes objects as Xml elements with an enclosing root element.
-   */
-  @Test
-  public void testXmlWriter() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    XmlWriteOperation<Bird> writeOp =
-        XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
-            .withRecordClass(Bird.class)
-            .withRootElement("birds")
-            .createSink()
-            .createWriteOperation(options);
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
-
-    List<Bird> bundle =
-        Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose"));
-    List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>robin</species>",
-        "<adjective>bemused</adjective>", "</bird>", "<bird>", "<species>goose</species>",
-        "<adjective>evasive</adjective>", "</bird>", "</birds>");
-    runTestWrite(writer, bundle, lines);
-  }
-
-  /**
-   * Builder methods correctly initialize an XML Sink.
-   */
-  @Test
-  public void testBuildXmlWriteTransform() {
-    XmlIO.Write<Bird> write =
-        XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
-            .withRecordClass(Bird.class)
-            .withRootElement(testRootElement);
-    assertEquals(Bird.class, write.getRecordClass());
-    assertEquals(testRootElement, write.getRootElement());
-    assertEquals(testFilePrefix, write.getFilenamePrefix());
-  }
-
-  /** Validation ensures no fields are missing. */
-  @Test
-  public void testValidateXmlSinkMissingRecordClass() {
-    thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write()
-        .withRootElement(testRootElement)
-        .toFilenamePrefix(testFilePrefix)
-        .validate(null);
-  }
-
-  @Test
-  public void testValidateXmlSinkMissingRootElement() {
-    thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
-  }
-
-  @Test
-  public void testValidateXmlSinkMissingFilePrefix() {
-    thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
-  }
-
-  /**
-   * An XML Sink correctly creates an XmlWriteOperation.
-   */
-  @Test
-  public void testCreateWriteOperations() {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    XmlSink<Bird> sink =
-        XmlIO.<Bird>write()
-            .withRecordClass(Bird.class)
-            .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
-            .createSink();
-    XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
-    Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
-  }
-
-  /**
-   * An XmlWriteOperation correctly creates an XmlWriter.
-   */
-  @Test
-  public void testCreateWriter() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    XmlWriteOperation<Bird> writeOp =
-        XmlIO.<Bird>write()
-            .withRecordClass(Bird.class)
-            .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
-            .createSink()
-            .createWriteOperation(options);
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
-    Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writer.getWriteOperation().tempDirectory.get()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
-    assertNotNull(writer.marshaller);
-  }
-
-  @Test
-  public void testDisplayData() {
-    XmlIO.Write<Integer> write = XmlIO.<Integer>write()
-        .toFilenamePrefix("foobar")
-        .withRootElement("bird")
-        .withRecordClass(Integer.class);
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
-    assertThat(displayData, hasDisplayItem("rootElement", "bird"));
-    assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
-  }
-
-  /**
-   * Write a bundle with an XmlWriter and verify the output is expected.
-   */
-  private <T> void runTestWrite(XmlWriter<T> writer, List<T> bundle, List<String> expected)
-      throws Exception {
-    File tmpFile = tmpFolder.newFile("foo.txt");
-    try (FileOutputStream fileOutputStream = new FileOutputStream(tmpFile)) {
-      writeBundle(writer, bundle, fileOutputStream.getChannel());
-    }
-    List<String> lines = new ArrayList<>();
-    try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
-      for (;;) {
-        String line = reader.readLine();
-        if (line == null) {
-          break;
-        }
-        line = line.trim();
-        if (line.length() > 0) {
-          lines.add(line);
-        }
-      }
-      assertEquals(expected, lines);
-    }
-  }
-
-  /**
-   * Write a bundle with an XmlWriter.
-   */
-  private <T> void writeBundle(XmlWriter<T> writer, List<T> elements, WritableByteChannel channel)
-      throws Exception {
-    writer.prepareWrite(channel);
-    writer.writeHeader();
-    for (T elem : elements) {
-      writer.write(elem);
-    }
-    writer.writeFooter();
-  }
-
-  /**
-   * Test JAXB annotated class.
-   */
-  @SuppressWarnings("unused")
-  @XmlRootElement(name = "bird")
-  @XmlType(propOrder = {"name", "adjective"})
-  private static final class Bird {
-    private String name;
-    private String adjective;
-
-    @XmlElement(name = "species")
-    public String getName() {
-      return name;
-    }
-
-    public void setName(String name) {
-      this.name = name;
-    }
-
-    public String getAdjective() {
-      return adjective;
-    }
-
-    public void setAdjective(String adjective) {
-      this.adjective = adjective;
-    }
-
-    public Bird() {}
-
-    public Bird(String adjective, String name) {
-      this.adjective = adjective;
-      this.name = name;
-    }
-  }
-}


Mime
View raw message