beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-2328] Add TikaIO
Date Mon, 11 Sep 2017 06:03:03 GMT
Repository: beam
Updated Branches:
  refs/heads/master b700bc47f -> d60b29ff7


[BEAM-2328] Add TikaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2265b6ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2265b6ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2265b6ce

Branch: refs/heads/master
Commit: 2265b6ce6569ee9b63a697a052ecc72b1d2f2cdb
Parents: b700bc4
Author: Sergey Beryozkin <sberyozkin@gmail.com>
Authored: Thu May 25 16:47:59 2017 +0100
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Mon Sep 11 08:01:29 2017 +0200

----------------------------------------------------------------------
 sdks/java/io/pom.xml                            |   1 +
 sdks/java/io/tika/pom.xml                       | 118 +++++
 .../org/apache/beam/sdk/io/tika/TikaIO.java     | 307 ++++++++++++
 .../apache/beam/sdk/io/tika/TikaOptions.java    |  78 ++++
 .../org/apache/beam/sdk/io/tika/TikaSource.java | 466 +++++++++++++++++++
 .../apache/beam/sdk/io/tika/package-info.java   |  22 +
 .../org/apache/beam/sdk/io/tika/TikaIOTest.java | 261 +++++++++++
 .../apache/beam/sdk/io/tika/TikaReaderTest.java |  82 ++++
 .../apache/beam/sdk/io/tika/TikaSourceTest.java |  73 +++
 .../src/test/resources/apache-beam-tika-pdf.zip | Bin 0 -> 11685 bytes
 .../src/test/resources/apache-beam-tika.pdf     | Bin 0 -> 12392 bytes
 .../src/test/resources/apache-beam-tika1.odt    | Bin 0 -> 12540 bytes
 .../src/test/resources/apache-beam-tika2.odt    | Bin 0 -> 11412 bytes
 .../java/io/tika/src/test/resources/damaged.pdf |   2 +
 14 files changed, 1410 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index c291e5d..c1bb2f2 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -57,6 +57,7 @@
     <module>mongodb</module>
     <module>mqtt</module>
     <module>solr</module>
+    <module>tika</module>
     <module>xml</module>
   </modules>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/pom.xml b/sdks/java/io/tika/pom.xml
new file mode 100644
index 0000000..c653d1e
--- /dev/null
+++ b/sdks/java/io/tika/pom.xml
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-parent</artifactId>
+      <version>2.2.0-SNAPSHOT</version>
+      <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>beam-sdks-java-io-tika</artifactId>
+    <name>Apache Beam :: SDKs :: Java :: IO :: Tika</name>
+    <description>Tika Input to parse files.</description>
+ 
+
+    <properties>
+        <tika.version>1.16</tika.version>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.auto.value</groupId>
+            <artifactId>auto-value</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-core</artifactId>
+            <version>${tika.version}</version>
+        </dependency>
+        
+        <!-- test dependencies -->
+        <dependency>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-sdks-java-core</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-direct-java</artifactId>
+          <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+          <groupId>org.hamcrest</groupId>
+          <artifactId>hamcrest-all</artifactId>
+          <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.tika</groupId>
+            <artifactId>tika-parsers</artifactId>
+            <version>${tika.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
new file mode 100644
index 0000000..5d6eea7
--- /dev/null
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaIO.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.auto.value.AutoValue;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.tika.metadata.Metadata;
+
+
+/**
+ * {@link PTransform} for parsing arbitrary files using Apache Tika.
+ * Files in many well known text, binary or scientific formats can be processed.
+ *
+ * <p>To read a {@link PCollection} from one or more files
+ * use {@link TikaIO.Read#from(String)}
+ * to specify the path of the file(s) to be read.
+ *
+ * <p>{@link TikaIO.Read} returns a bounded {@link PCollection} of {@link String Strings},
+ * each corresponding to a sequence of characters reported by Apache Tika SAX Parser.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * // A simple Read of a local PDF file (only runs locally):
+ * PCollection<String> content = p.apply(TikaInput.from("/local/path/to/file.pdf"));
+ * }</pre>
+ */
+public class TikaIO {
+
+  /**
+   * A {@link PTransform} that parses one or more files and returns a bounded {@link PCollection}
+   * containing one element for each sequence of characters reported by Apache Tika SAX Parser.
+   */
+   public static Read read() {
+     return new AutoValue_TikaIO_Read.Builder()
+        .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
+        .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
+        .build();
+   }
+
+   /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
+    private static final long serialVersionUID = 2198301984784351829L;
+    public static final long DEFAULT_QUEUE_POLL_TIME = 50L;
+    public static final long DEFAULT_QUEUE_MAX_POLL_TIME = 3000L;
+
+    @Nullable abstract ValueProvider<String> getFilepattern();
+    @Nullable abstract ValueProvider<String> getTikaConfigPath();
+    @Nullable abstract Metadata getInputMetadata();
+    @Nullable abstract Boolean getReadOutputMetadata();
+    @Nullable abstract Long getQueuePollTime();
+    @Nullable abstract Long getQueueMaxPollTime();
+    @Nullable abstract Integer getMinimumTextLength();
+    @Nullable abstract Boolean getParseSynchronously();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder setTikaConfigPath(ValueProvider<String> tikaConfigPath);
+      abstract Builder setInputMetadata(Metadata metadata);
+      abstract Builder setReadOutputMetadata(Boolean value);
+      abstract Builder setQueuePollTime(Long value);
+      abstract Builder setQueueMaxPollTime(Long value);
+      abstract Builder setMinimumTextLength(Integer value);
+      abstract Builder setParseSynchronously(Boolean value);
+
+      abstract Read build();
+    }
+
+    /**
+     * A {@link PTransform} that parses one or more files with the given filename
+     * or filename pattern and returns a bounded {@link PCollection} containing
+     * one element for each sequence of characters reported by Apache Tika SAX Parser.
+     *
+     * <p>Filepattern can be a local path (if running locally), or a Google Cloud Storage
+     * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}
+     * (if running locally or using remote execution service).
+     *
+     * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
+     * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
+     */
+    public Read from(String filepattern) {
+      checkNotNull(filepattern, "Filepattern cannot be empty.");
+      return from(StaticValueProvider.of(filepattern));
+    }
+
+    /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
+    public Read from(ValueProvider<String> filepattern) {
+      checkNotNull(filepattern, "Filepattern cannot be empty.");
+      return toBuilder()
+          .setFilepattern(filepattern)
+          .setQueuePollTime(Read.DEFAULT_QUEUE_POLL_TIME)
+          .setQueueMaxPollTime(Read.DEFAULT_QUEUE_MAX_POLL_TIME)
+          .build();
+    }
+
+    /**
+     * Returns a new transform which will use the custom TikaConfig.
+     */
+    public Read withTikaConfigPath(String tikaConfigPath) {
+      checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty.");
+      return withTikaConfigPath(StaticValueProvider.of(tikaConfigPath));
+    }
+
+    /** Same as {@code with(tikaConfigPath)}, but accepting a {@link ValueProvider}. */
+    public Read withTikaConfigPath(ValueProvider<String> tikaConfigPath) {
+      checkNotNull(tikaConfigPath, "TikaConfigPath cannot be empty.");
+      return toBuilder()
+          .setTikaConfigPath(tikaConfigPath)
+          .build();
+    }
+
+    /**
+     * Returns a new transform which will use the provided content type hint
+     * to make the file parser detection more efficient.
+     */
+    public Read withContentTypeHint(String contentType) {
+      checkNotNull(contentType, "ContentType cannot be empty.");
+      Metadata metadata = new Metadata();
+      metadata.add(Metadata.CONTENT_TYPE, contentType);
+      return withInputMetadata(metadata);
+    }
+
+    /**
+     * Returns a new transform which will use the provided input metadata
+     * for parsing the files.
+     */
+    public Read withInputMetadata(Metadata metadata) {
+      Metadata inputMetadata = this.getInputMetadata();
+      if (inputMetadata != null) {
+        for (String name : metadata.names()) {
+            inputMetadata.set(name, metadata.get(name));
+        }
+      } else {
+        inputMetadata = metadata;
+      }
+      return toBuilder().setInputMetadata(inputMetadata).build();
+    }
+
+    /**
+     * Returns a new transform which will report the metadata.
+     */
+    public Read withReadOutputMetadata(Boolean value) {
+      return toBuilder().setReadOutputMetadata(value).build();
+    }
+
+    /**
+     * Returns a new transform which will use the specified queue poll time.
+     */
+    public Read withQueuePollTime(Long value) {
+      return toBuilder().setQueuePollTime(value).build();
+    }
+
+    /**
+     * Returns a new transform which will use the specified queue max poll time.
+     */
+    public Read withQueueMaxPollTime(Long value) {
+      return toBuilder().setQueueMaxPollTime(value).build();
+    }
+
+    /**
+     * Returns a new transform which will operate on the text blocks with the
+     * given minimum text length.
+     */
+    public Read withMinimumTextlength(Integer value) {
+      return toBuilder().setMinimumTextLength(value).build();
+    }
+
+    /**
+     * Returns a new transform which will use the synchronous reader.
+     */
+    public Read withParseSynchronously(Boolean value) {
+      return toBuilder().setParseSynchronously(value).build();
+    }
+
+    /**
+     * Path to Tika configuration resource.
+     */
+    public Read withOptions(TikaOptions options) {
+      checkNotNull(options, "TikaOptions cannot be empty.");
+      Builder builder = toBuilder();
+      builder.setFilepattern(StaticValueProvider.of(options.getInput()))
+             .setQueuePollTime(options.getQueuePollTime())
+             .setQueueMaxPollTime(options.getQueueMaxPollTime())
+             .setMinimumTextLength(options.getMinimumTextLength())
+             .setParseSynchronously(options.getParseSynchronously());
+      if (options.getContentTypeHint() != null) {
+        Metadata metadata = this.getInputMetadata();
+        if (metadata == null) {
+            metadata = new Metadata();
+        }
+        metadata.add(Metadata.CONTENT_TYPE, options.getContentTypeHint());
+        builder.setInputMetadata(metadata);
+      }
+      if (options.getTikaConfigPath() != null) {
+        builder.setTikaConfigPath(StaticValueProvider.of(options.getTikaConfigPath()));
+      }
+      if (Boolean.TRUE.equals(options.getReadOutputMetadata())) {
+        builder.setReadOutputMetadata(options.getReadOutputMetadata());
+      }
+      return builder.build();
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      checkNotNull(this.getFilepattern(), "Filepattern cannot be empty.");
+      final Bounded<String> read = org.apache.beam.sdk.io.Read.from(new TikaSource(this));
+      PCollection<String> pcol = input.getPipeline().apply(read);
+      pcol.setCoder(getDefaultOutputCoder());
+      return pcol;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      String filepatternDisplay = getFilepattern().isAccessible()
+        ? getFilepattern().get() : getFilepattern().toString();
+      builder
+          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+            .withLabel("File Pattern"));
+      if (getTikaConfigPath() != null) {
+        String tikaConfigPathDisplay = getTikaConfigPath().isAccessible()
+          ? getTikaConfigPath().get() : getTikaConfigPath().toString();
+        builder.add(DisplayData.item("tikaConfigPath", tikaConfigPathDisplay)
+            .withLabel("TikaConfig Path"));
+      }
+      Metadata metadata = getInputMetadata();
+      if (metadata != null) {
+        StringBuilder sb = new StringBuilder();
+        sb.append('[');
+        for (String name : metadata.names()) {
+            if (sb.length() > 1) {
+              sb.append(',');
+            }
+            sb.append(name).append('=').append(metadata.get(name));
+        }
+        sb.append(']');
+        builder
+            .add(DisplayData.item("inputMetadata", sb.toString())
+            .withLabel("Input Metadata"));
+      }
+      if (Boolean.TRUE.equals(getParseSynchronously())) {
+        builder
+          .add(DisplayData.item("parseMode", "synchronous")
+            .withLabel("Parse Mode"));
+      } else {
+        builder
+          .add(DisplayData.item("parseMode", "asynchronous")
+            .withLabel("Parse Mode"));
+        builder
+          .add(DisplayData.item("queuePollTime", getQueuePollTime().toString())
+            .withLabel("Queue Poll Time"))
+        .add(DisplayData.item("queueMaxPollTime", getQueueMaxPollTime().toString())
+          .withLabel("Queue Max Poll Time"));
+      }
+      Integer minTextLen = getMinimumTextLength();
+      if (minTextLen != null && minTextLen > 0) {
+        builder
+        .add(DisplayData.item("minTextLen", getMinimumTextLength().toString())
+          .withLabel("Minimum Text Length"));
+      }
+      if (Boolean.TRUE.equals(getReadOutputMetadata())) {
+        builder
+          .add(DisplayData.item("readOutputMetadata", "true")
+            .withLabel("Read Output Metadata"));
+      }
+    }
+
+    @Override
+    protected Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
new file mode 100644
index 0000000..fb97678
--- /dev/null
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaOptions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation;
+
+/**
+ * TikaInput Options to support the command-line applications.
+ */
+public interface TikaOptions extends PipelineOptions {
+
+  @Description("Input path")
+  @Validation.Required
+  String getInput();
+  void setInput(String value);
+
+  @Description("Tika Config path")
+  String getTikaConfigPath();
+  void setTikaConfigPath(String value);
+
+  @Description("Tika Parser Content Type hint")
+  String getContentTypeHint();
+  void setContentTypeHint(String value);
+
+  @Description("Metadata report status")
+  @Default.Boolean(false)
+  Boolean getReadOutputMetadata();
+  void setReadOutputMetadata(Boolean value);
+
+  @Description("Optional use of the synchronous reader")
+  @Default.Boolean(false)
+  Boolean getParseSynchronously();
+  void setParseSynchronously(Boolean value);
+
+  @Description("Tika Parser queue poll time in milliseconds")
+  @Default.Long(TikaIO.Read.DEFAULT_QUEUE_POLL_TIME)
+  Long getQueuePollTime();
+  void setQueuePollTime(Long value);
+
+  @Description("Tika Parser queue maximum poll time in milliseconds")
+  @Default.Long(TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME)
+  Long getQueueMaxPollTime();
+  void setQueueMaxPollTime(Long value);
+
+  @Description("Minumin text fragment length for Tika Parser to report")
+  @Default.Integer(0)
+  Integer getMinimumTextLength();
+  void setMinimumTextLength(Integer value);
+
+  @Description("Pipeline name")
+  @Default.String("TikaRead")
+  String getPipelineName();
+  void setPipelineName(String value);
+
+  @Description("Output path")
+  @Default.String("/tmp/tika/out")
+  String getOutput();
+  void setOutput(String value);
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
new file mode 100644
index 0000000..7c8852b
--- /dev/null
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/TikaSource.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.exception.TikaException;
+import org.apache.tika.io.TikaInputStream;
+import org.apache.tika.parser.AutoDetectParser;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.parser.Parser;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+/**
+ * Implementation detail of {@link TikaIO.Read}.
+ *
+ * <p>A {@link Source} which can represent the content of the files parsed by Apache Tika.
+ */
+class TikaSource extends BoundedSource<String> {
+  private static final long serialVersionUID = -509574062910491122L;
+  private static final Logger LOG = LoggerFactory.getLogger(TikaSource.class);
+
+  @Nullable
+  private MatchResult.Metadata singleFileMetadata;
+  private final Mode mode;
+  private final TikaIO.Read spec;
+
+  /**
+   * Source mode.
+   */
+  public enum Mode {
+    FILEPATTERN, SINGLE_FILE
+  }
+
+  TikaSource(TikaIO.Read spec) {
+    this.mode = Mode.FILEPATTERN;
+    this.spec = spec;
+  }
+
+  TikaSource(Metadata fileMetadata, TikaIO.Read spec) {
+    mode = Mode.SINGLE_FILE;
+    this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
+    this.spec = spec;
+  }
+
+  @Override
+  public BoundedReader<String> createReader(PipelineOptions options) throws IOException {
+    this.validate();
+    checkState(spec.getFilepattern().isAccessible(),
+        "Cannot create a Tika reader without access to the file"
+        + " or pattern specification: {}.", spec.getFilepattern());
+    if (spec.getTikaConfigPath() != null) {
+      checkState(spec.getTikaConfigPath().isAccessible(),
+        "Cannot create a Tika reader without access to its configuration",
+        spec.getTikaConfigPath());
+    }
+
+    String fileOrPattern = spec.getFilepattern().get();
+    if (mode == Mode.FILEPATTERN) {
+      List<Metadata> fileMetadata = expandFilePattern(fileOrPattern);
+      List<TikaReader> fileReaders = new ArrayList<>();
+      for (Metadata metadata : fileMetadata) {
+        fileReaders.add(new TikaReader(this, metadata.resourceId().toString()));
+      }
+      if (fileReaders.size() == 1) {
+        return fileReaders.get(0);
+      }
+      return new FilePatternTikaReader(this, fileReaders);
+    } else {
+      return new TikaReader(this, singleFileMetadata.resourceId().toString());
+    }
+
+  }
+
+  @Override
+  public List<? extends TikaSource> split(long desiredBundleSizeBytes, PipelineOptions options)
+    throws Exception {
+    if (mode == Mode.SINGLE_FILE) {
+      return ImmutableList.of(this);
+    } else {
+      List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get());
+
+      List<TikaSource> splitResults = new LinkedList<>();
+      for (Metadata metadata : fileMetadata) {
+        splitResults.add(new TikaSource(metadata, spec));
+      }
+      return splitResults;
+    }
+  }
+
+  public TikaIO.Read getTikaInputRead() {
+    return spec;
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+  @Override
+  public void validate() {
+    switch (mode) {
+    case FILEPATTERN:
+      checkArgument(this.singleFileMetadata == null,
+        "Unexpected initialized singleFileMetadata value");
+      break;
+    case SINGLE_FILE:
+      checkNotNull(this.singleFileMetadata,
+        "Unexpected null singleFileMetadata value");
+      break;
+    default:
+      throw new IllegalStateException("Unknown mode: " + mode);
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    long totalSize = 0;
+    List<Metadata> fileMetadata = expandFilePattern(spec.getFilepattern().get());
+    for (Metadata metadata : fileMetadata) {
+      totalSize += metadata.sizeBytes();
+    }
+    return totalSize;
+  }
+
+  Mode getMode() {
+    return this.mode;
+  }
+
+  Metadata getSingleFileMetadata() {
+    return this.singleFileMetadata;
+  }
+
+  private static List<Metadata> expandFilePattern(String fileOrPattern) throws IOException {
+    MatchResult matches = Iterables.getOnlyElement(
+      FileSystems.match(Collections.singletonList(fileOrPattern)));
+    LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPattern);
+    List<Metadata> metadata = ImmutableList.copyOf(matches.metadata());
+    checkArgument(!metadata.isEmpty(),
+      "Unable to find any files matching %s", fileOrPattern);
+
+    return metadata;
+  }
+
+  /**
+   *  FilePatternTikaReader.
+   *  TODO: This is mostly a copy of FileBasedSource internal file-pattern reader
+   *        so that code would need to be generalized as part of the future contribution
+   */
+  static class FilePatternTikaReader extends BoundedReader<String> {
+    private final TikaSource source;
+    final ListIterator<TikaReader> fileReadersIterator;
+    TikaReader currentReader = null;
+
+    public FilePatternTikaReader(TikaSource source, List<TikaReader> fileReaders) {
+      this.source = source;
+      this.fileReadersIterator = fileReaders.listIterator();
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return startNextNonemptyReader();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      checkState(currentReader != null, "Call start() before advance()");
+      if (currentReader.advance()) {
+        return true;
+      }
+      return startNextNonemptyReader();
+    }
+
+    private boolean startNextNonemptyReader() throws IOException {
+      while (fileReadersIterator.hasNext()) {
+        currentReader = fileReadersIterator.next();
+        if (currentReader.start()) {
+          return true;
+        }
+        currentReader.close();
+      }
+      return false;
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      return currentReader.getCurrent();
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return currentReader.getCurrentTimestamp();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (currentReader != null) {
+        currentReader.close();
+      }
+      while (fileReadersIterator.hasNext()) {
+        fileReadersIterator.next().close();
+      }
+    }
+
+    @Override
+    public TikaSource getCurrentSource() {
+      return source;
+    }
+  }
+
+  static class TikaReader extends BoundedReader<String> {
+    private ExecutorService execService;
+    private final ContentHandlerImpl tikaHandler = new ContentHandlerImpl();
+    private String current;
+    private TikaSource source;
+    private String filePath;
+    private TikaIO.Read spec;
+    private org.apache.tika.metadata.Metadata tikaMetadata;
+    private Iterator<String> metadataIterator;
+
+    TikaReader(TikaSource source, String filePath) {
+      this.source = source;
+      this.filePath = filePath;
+      this.spec = source.getTikaInputRead();
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      final InputStream is = TikaInputStream.get(Paths.get(filePath));
+      TikaConfig tikaConfig = null;
+      if (spec.getTikaConfigPath() != null) {
+        try {
+          tikaConfig = new TikaConfig(spec.getTikaConfigPath().get());
+        } catch (TikaException | SAXException e) {
+          throw new IOException(e);
+        }
+      }
+      final Parser parser = tikaConfig == null ? new AutoDetectParser()
+          : new AutoDetectParser(tikaConfig);
+      final ParseContext context = new ParseContext();
+      context.set(Parser.class, parser);
+      tikaMetadata = spec.getInputMetadata() != null ? spec.getInputMetadata()
+          : new org.apache.tika.metadata.Metadata();
+
+      if (spec.getMinimumTextLength() != null) {
+        tikaHandler.setMinTextLength(spec.getMinimumTextLength());
+      }
+
+      if (!Boolean.TRUE.equals(spec.getParseSynchronously())) {
+        // Try to parse the file on the executor thread to make the best effort
+        // at letting the pipeline thread advancing over the file content
+        // without immediately parsing all of it
+        execService = Executors.newFixedThreadPool(1);
+        execService.submit(new Runnable() {
+          public void run() {
+            try {
+              parser.parse(is, tikaHandler, tikaMetadata, context);
+              is.close();
+            } catch (Exception ex) {
+              tikaHandler.setParseException(ex);
+            }
+          }
+        });
+      } else {
+        // Some parsers might not be able to report the content in chunks.
+        // It does not make sense to create extra threads in such cases
+        try {
+          parser.parse(is, tikaHandler, tikaMetadata, context);
+        } catch (Exception ex) {
+          throw new IOException(ex);
+        } finally {
+          is.close();
+        }
+      }
+      return advanceToNext();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      checkState(current != null, "Call start() before advance()");
+      return advanceToNext();
+    }
+
+    protected boolean advanceToNext() throws IOException {
+      current = null;
+      // The content is reported first
+      if (metadataIterator == null) {
+        // Check if some content is already available
+        current = tikaHandler.getCurrent();
+
+        if (current == null && !Boolean.TRUE.equals(spec.getParseSynchronously())) {
+          long maxPollTime = 0;
+          long configuredMaxPollTime = spec.getQueueMaxPollTime() == null
+              ? TikaIO.Read.DEFAULT_QUEUE_MAX_POLL_TIME : spec.getQueueMaxPollTime();
+          long configuredPollTime = spec.getQueuePollTime() == null
+              ? TikaIO.Read.DEFAULT_QUEUE_POLL_TIME : spec.getQueuePollTime();
+
+          // Poll the queue till the next piece of data is available
+          while (current == null && maxPollTime < configuredMaxPollTime) {
+            boolean docEnded = tikaHandler.waitForNext(configuredPollTime);
+            current = tikaHandler.getCurrent();
+            // End of Document ?
+            if (docEnded) {
+              break;
+            }
+            maxPollTime += spec.getQueuePollTime();
+          }
+        }
+        // No more content ?
+        if (current == null && Boolean.TRUE.equals(spec.getReadOutputMetadata())) {
+          // Time to report the metadata
+          metadataIterator = Arrays.asList(tikaMetadata.names()).iterator();
+        }
+      }
+
+      if (metadataIterator != null && metadataIterator.hasNext()) {
+          String key = metadataIterator.next();
+          // The metadata name/value separator can be configured if needed
+          current = key + "=" + tikaMetadata.get(key);
+      }
+      return current != null;
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (execService != null) {
+          execService.shutdown();
+      }
+    }
+
+    ExecutorService getExecutorService() {
+      return execService;
+    }
+
+    @Override
+    public BoundedSource<String> getCurrentSource() {
+      return source;
+    }
+  }
+
+  /**
+   * Tika Parser Content Handler.
+   */
+  static class ContentHandlerImpl extends DefaultHandler {
+    private Queue<String> queue = new ConcurrentLinkedQueue<>();
+    private volatile boolean documentEnded;
+    private volatile Exception parseException;
+    private volatile String current;
+    private int minTextLength;
+
+    @Override
+    public void characters(char ch[], int start, int length) throws SAXException {
+      String value = new String(ch, start, length).trim();
+      if (!value.isEmpty()) {
+        if (minTextLength <= 0) {
+          queue.add(value);
+        } else {
+          current = current == null ? value : current + " " + value;
+          if (current.length() >= minTextLength) {
+            queue.add(current);
+            current = null;
+          }
+        }
+      }
+    }
+
+    public void setParseException(Exception ex) {
+      this.parseException = ex;
+    }
+
+    public synchronized boolean waitForNext(long pollTime) throws IOException {
+      if (!documentEnded) {
+        try {
+          wait(pollTime);
+        } catch (InterruptedException ex) {
+          // continue;
+        }
+      }
+      return documentEnded;
+    }
+
+    @Override
+    public synchronized void endDocument() throws SAXException {
+      this.documentEnded = true;
+      notify();
+    }
+
+    public String getCurrent() throws IOException {
+      checkParseException();
+      String value = queue.poll();
+      if (value == null && documentEnded) {
+        return current;
+      } else {
+        return value;
+      }
+    }
+    public void checkParseException() throws IOException {
+      if (parseException != null) {
+        throw new IOException(parseException);
+      }
+    }
+
+    public void setMinTextLength(int minTextLength) {
+      this.minTextLength = minTextLength;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java
new file mode 100644
index 0000000..972d69f
--- /dev/null
+++ b/sdks/java/io/tika/src/main/java/org/apache/beam/sdk/io/tika/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transform for reading and parsing files with Apache Tika.
+ */
+package org.apache.beam.sdk.io.tika;

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
new file mode 100644
index 0000000..368eff5
--- /dev/null
+++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaIOTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.tika.exception.TikaException;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Tests TikaInput.
+ */
+public class TikaIOTest {
+  private static final String[] PDF_FILE = new String[] {
+      "Combining", "can help to ingest", "Apache Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika"
+  };
+  private static final String[] PDF_ZIP_FILE = new String[] {
+      "Combining", "can help to ingest", "Apache Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika",
+      "apache-beam-tika.pdf"
+  };
+  private static final String[] ODT_FILE = new String[] {
+      "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika"
+  };
+  private static final String[] ODT_FILE_WITH_METADATA = new String[] {
+      "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika",
+      "Author=BeamTikaUser"
+  };
+  private static final String[] ODT_FILE_WITH_MIN_TEXT_LEN = new String[] {
+      "Combining Apache Beam", "and Apache Tika can help to ingest", "in most known formats.",
+      "the content from the files"
+  };
+  private static final String[] ODT_FILES = new String[] {
+      "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika",
+      "Open Office", "Text", "PDF", "Excel", "Scientific",
+      "and other formats", "are supported."
+  };
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void testReadPdfFile() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath();
+
+    doTestReadFiles(resourcePath, PDF_FILE);
+  }
+
+  @Test
+  public void testReadZipPdfFile() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika-pdf.zip").getPath();
+
+    doTestReadFiles(resourcePath, PDF_ZIP_FILE);
+  }
+
+  @Test
+  public void testReadOdtFile() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+
+    doTestReadFiles(resourcePath, ODT_FILE);
+  }
+
+  @Test
+  public void testReadOdtFiles() throws IOException {
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+    resourcePath = resourcePath.replace("apache-beam-tika1", "*");
+
+    doTestReadFiles(resourcePath, ODT_FILES);
+  }
+
+  private void doTestReadFiles(String resourcePath, String[] expected) throws IOException {
+    PCollection<String> output = p.apply("ParseFiles", TikaIO.read().from(resourcePath));
+    PAssert.that(output).containsInAnyOrder(expected);
+    p.run();
+  }
+
+  @Test
+  public void testReadOdtFileWithMetadata() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+
+    PCollection<String> output = p.apply("ParseOdtFile",
+        TikaIO.read().from(resourcePath).withReadOutputMetadata(true))
+        .apply(ParDo.of(new FilterMetadataFn()));
+    PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_METADATA);
+    p.run();
+  }
+
+  @Test
+  public void testReadOdtFileWithMinTextLength() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+
+    PCollection<String> output = p.apply("ParseOdtFile",
+        TikaIO.read().from(resourcePath).withMinimumTextlength(20));
+    PAssert.that(output).containsInAnyOrder(ODT_FILE_WITH_MIN_TEXT_LEN);
+    p.run();
+  }
+
+  @Test
+  public void testReadPdfFileSync() throws IOException {
+
+    String resourcePath = getClass().getResource("/apache-beam-tika.pdf").getPath();
+
+    PCollection<String> output = p.apply("ParsePdfFile",
+        TikaIO.read().from(resourcePath).withParseSynchronously(true));
+    PAssert.that(output).containsInAnyOrder(PDF_FILE);
+    p.run();
+  }
+
+  @Test
+  public void testReadDamagedPdfFile() throws IOException {
+
+    doTestReadDamagedPdfFile(false);
+  }
+
+  @Test
+  public void testReadDamagedPdfFileSync() throws IOException {
+    doTestReadDamagedPdfFile(true);
+  }
+
+  private void doTestReadDamagedPdfFile(boolean sync) throws IOException {
+
+    String resourcePath = getClass().getResource("/damaged.pdf").getPath();
+
+    p.apply("ParseInvalidPdfFile",
+        TikaIO.read().from(resourcePath).withParseSynchronously(sync));
+    try {
+        p.run();
+        fail("Transform failure is expected");
+    } catch (RuntimeException ex) {
+      assertTrue(ex.getCause().getCause() instanceof TikaException);
+    }
+  }
+
+  @Test
+  public void testReadDisplayData() {
+    TikaIO.Read read = TikaIO.read()
+        .from("foo.*")
+        .withTikaConfigPath("tikaconfigpath")
+        .withContentTypeHint("application/pdf")
+        .withMinimumTextlength(100)
+        .withReadOutputMetadata(true);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("tikaConfigPath", "tikaconfigpath"));
+    assertThat(displayData, hasDisplayItem("inputMetadata",
+        "[Content-Type=application/pdf]"));
+    assertThat(displayData, hasDisplayItem("readOutputMetadata", "true"));
+    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
+    assertThat(displayData, hasDisplayItem("queuePollTime", "50"));
+    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000"));
+    assertThat(displayData, hasDisplayItem("minTextLen", "100"));
+    assertEquals(8, displayData.items().size());
+  }
+
+  @Test
+  public void testReadDisplayDataSyncMode() {
+    TikaIO.Read read = TikaIO.read()
+        .from("foo.*")
+        .withParseSynchronously(true);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("parseMode", "synchronous"));
+    assertEquals(2, displayData.items().size());
+  }
+
+  @Test
+  public void testReadDisplayDataWithDefaultOptions() {
+    final String[] args = new String[]{"--input=/input/tika.pdf"};
+    TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf"));
+    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
+    assertThat(displayData, hasDisplayItem("queuePollTime", "50"));
+    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "3000"));
+    assertEquals(4, displayData.items().size());
+  }
+  @Test
+  public void testReadDisplayDataWithCustomOptions() {
+    final String[] args = new String[]{"--input=/input/tika.pdf",
+                                       "--tikaConfigPath=/tikaConfigPath",
+                                       "--queuePollTime=10",
+                                       "--queueMaxPollTime=1000",
+                                       "--contentTypeHint=application/pdf",
+                                       "--readOutputMetadata=true"};
+    TikaIO.Read read = TikaIO.read().withOptions(createOptions(args));
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("filePattern", "/input/tika.pdf"));
+    assertThat(displayData, hasDisplayItem("tikaConfigPath", "/tikaConfigPath"));
+    assertThat(displayData, hasDisplayItem("parseMode", "asynchronous"));
+    assertThat(displayData, hasDisplayItem("queuePollTime", "10"));
+    assertThat(displayData, hasDisplayItem("queueMaxPollTime", "1000"));
+    assertThat(displayData, hasDisplayItem("inputMetadata",
+        "[Content-Type=application/pdf]"));
+    assertThat(displayData, hasDisplayItem("readOutputMetadata", "true"));
+    assertEquals(7, displayData.items().size());
+  }
+
+  private static TikaOptions createOptions(String[] args) {
+    return PipelineOptionsFactory.fromArgs(args)
+        .withValidation().as(TikaOptions.class);
+  }
+
+  static class FilterMetadataFn extends DoFn<String, String> {
+    private static final long serialVersionUID = 6338014219600516621L;
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      String word = c.element();
+      if (word.contains("=") && !word.startsWith("Author")) {
+        return;
+      }
+      c.output(word);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
new file mode 100644
index 0000000..5c4e754
--- /dev/null
+++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaReaderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.beam.sdk.io.tika.TikaSource.TikaReader;
+import org.junit.Test;
+
+/**
+ * Tests TikaReader.
+ */
+public class TikaReaderTest {
+  private static final List<String> ODT_FILE = Arrays.asList(
+      "Combining", "can help to ingest", "Apache", "Beam", "in most known formats.",
+      "the content from the files", "and", "Apache Tika");
+
+  @Test
+  public void testOdtFileAsyncReader() throws Exception {
+    doTestOdtFileReader(false);
+  }
+  @Test
+  public void testOdtFileSyncReader() throws Exception {
+    doTestOdtFileReader(true);
+  }
+  private void doTestOdtFileReader(boolean sync) throws Exception {
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+    TikaSource source = new TikaSource(TikaIO.read()
+                                                .withParseSynchronously(sync)
+                                                .from(resourcePath));
+    TikaReader reader = (TikaReader) source.createReader(null);
+
+    List<String> content = new LinkedList<String>();
+    for (boolean available = reader.start(); available; available = reader.advance()) {
+      content.add(reader.getCurrent());
+    }
+    assertTrue(content.containsAll(ODT_FILE));
+    if (!sync) {
+      assertNotNull(reader.getExecutorService());
+    } else {
+      assertNull(reader.getExecutorService());
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testOdtFilesReader() throws Exception {
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+    String filePattern = resourcePath.replace("apache-beam-tika1", "*");
+
+    TikaSource source = new TikaSource(TikaIO.read().from(filePattern));
+    TikaSource.FilePatternTikaReader reader =
+        (TikaSource.FilePatternTikaReader) source.createReader(null);
+    List<String> content = new LinkedList<String>();
+    for (boolean available = reader.start(); available; available = reader.advance()) {
+      content.add(reader.getCurrent());
+    }
+    assertTrue(content.containsAll(ODT_FILE));
+    reader.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
new file mode 100644
index 0000000..550f469
--- /dev/null
+++ b/sdks/java/io/tika/src/test/java/org/apache/beam/sdk/io/tika/TikaSourceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.tika;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.tika.TikaSource.TikaReader;
+import org.junit.Test;
+
+/**
+ * Tests TikaSource.
+ */
+public class TikaSourceTest {
+
+  @Test
+  public void testOdtFileSource() throws Exception {
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+    TikaSource source = new TikaSource(TikaIO.read().from(resourcePath));
+    assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder());
+
+    assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode());
+    assertTrue(source.createReader(null) instanceof TikaReader);
+
+    List<? extends TikaSource> sources = source.split(1, null);
+    assertEquals(1, sources.size());
+    TikaSource nextSource = sources.get(0);
+    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode());
+    assertEquals(resourcePath, nextSource.getSingleFileMetadata().resourceId().toString());
+  }
+
+  @Test
+  public void testOdtFilesSource() throws Exception {
+    String resourcePath = getClass().getResource("/apache-beam-tika1.odt").getPath();
+    String resourcePath2 = getClass().getResource("/apache-beam-tika2.odt").getPath();
+    String filePattern = resourcePath.replace("apache-beam-tika1", "*");
+
+    TikaSource source = new TikaSource(TikaIO.read().from(filePattern));
+    assertEquals(StringUtf8Coder.of(), source.getDefaultOutputCoder());
+
+    assertEquals(TikaSource.Mode.FILEPATTERN, source.getMode());
+    assertTrue(source.createReader(null) instanceof TikaSource.FilePatternTikaReader);
+
+    List<? extends TikaSource> sources = source.split(1, null);
+    assertEquals(2, sources.size());
+    TikaSource nextSource = sources.get(0);
+    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource.getMode());
+    String nextSourceResource = nextSource.getSingleFileMetadata().resourceId().toString();
+    TikaSource nextSource2 = sources.get(1);
+    assertEquals(TikaSource.Mode.SINGLE_FILE, nextSource2.getMode());
+    String nextSourceResource2 = nextSource2.getSingleFileMetadata().resourceId().toString();
+    assertTrue(nextSourceResource.equals(resourcePath) && nextSourceResource2.equals(resourcePath2)
+        || nextSourceResource.equals(resourcePath2) && nextSourceResource2.equals(resourcePath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip b/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip
new file mode 100644
index 0000000..4c0e0ef
Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika-pdf.zip differ

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf b/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf
new file mode 100644
index 0000000..d3c7f14
Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika.pdf differ

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt b/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt
new file mode 100644
index 0000000..87c5577
Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika1.odt differ

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt b/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt
new file mode 100644
index 0000000..a0ff320
Binary files /dev/null and b/sdks/java/io/tika/src/test/resources/apache-beam-tika2.odt differ

http://git-wip-us.apache.org/repos/asf/beam/blob/2265b6ce/sdks/java/io/tika/src/test/resources/damaged.pdf
----------------------------------------------------------------------
diff --git a/sdks/java/io/tika/src/test/resources/damaged.pdf b/sdks/java/io/tika/src/test/resources/damaged.pdf
new file mode 100644
index 0000000..7653b4b
--- /dev/null
+++ b/sdks/java/io/tika/src/test/resources/damaged.pdf
@@ -0,0 +1,2 @@
+%PDF-1.4
+


Mime
View raw message