beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [08/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Date Thu, 20 Jul 2017 17:09:35 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 90ede4c..849873c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -31,7 +31,10 @@ import java.util.Set;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.io.range.ByteKey;
@@ -41,6 +44,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -118,15 +122,17 @@ import org.slf4j.LoggerFactory;
  * <h3>Writing to HBase</h3>
  *
  * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a
- * {@link PCollection PCollection&lt;Mutation&gt;}, where each {@link Mutation} represents an
- * idempotent transformation on a row.
+ * {@link PCollection PCollection&lt;KV&lt;byte[], Iterable&lt;Mutation&gt;&gt;&gt;}, where the
+ * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an
+ * idempotent transformation to that row.
  *
  * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration}
  * to identify the HBase instance, for example:
  *
  * <pre>{@code
  * Configuration configuration = ...;
- * PCollection<Mutation> data = ...;
+ * PCollection<KV<byte[], Iterable<Mutation>>> data = ...;
+ * data.setCoder(HBaseIO.WRITE_CODER);
  *
  * data.apply("write",
  *     HBaseIO.write()
@@ -140,7 +146,7 @@ import org.slf4j.LoggerFactory;
  * it can evolve or be different in some aspects, but the idea is that users can easily migrate
  * from one to the other</p>.
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class HBaseIO {
     private static final Logger LOG = LoggerFactory.getLogger(HBaseIO.class);
 
@@ -539,7 +545,9 @@ public class HBaseIO {
      *
      * @see HBaseIO
      */
-    public static class Write extends PTransform<PCollection<Mutation>, PDone> {
+    public static class Write
+            extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> {
+
         /**
          * Returns a new {@link HBaseIO.Write} that will write to the HBase instance
          * indicated by the given Configuration, and using any other specified customizations.
@@ -567,7 +575,7 @@ public class HBaseIO {
         }
 
         @Override
-        public PDone expand(PCollection<Mutation> input) {
+        public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) {
             input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
             return PDone.in(input.getPipeline());
         }
@@ -605,7 +613,7 @@ public class HBaseIO {
         private final String tableId;
         private final SerializableConfiguration serializableConfiguration;
 
-        private class HBaseWriterFn extends DoFn<Mutation, Void> {
+        private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> {
 
             public HBaseWriterFn(String tableId,
                                  SerializableConfiguration serializableConfiguration) {
@@ -616,27 +624,31 @@ public class HBaseIO {
 
             @Setup
             public void setup() throws Exception {
-                connection = ConnectionFactory.createConnection(serializableConfiguration.get());
-            }
+                Configuration configuration = this.serializableConfiguration.get();
+                connection = ConnectionFactory.createConnection(configuration);
 
-            @StartBundle
-            public void startBundle(StartBundleContext c) throws IOException {
+                TableName tableName = TableName.valueOf(tableId);
                 BufferedMutatorParams params =
-                    new BufferedMutatorParams(TableName.valueOf(tableId));
+                    new BufferedMutatorParams(tableName);
                 mutator = connection.getBufferedMutator(params);
+
                 recordsWritten = 0;
             }
 
             @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-                mutator.mutate(c.element());
-                ++recordsWritten;
+            public void processElement(ProcessContext ctx) throws Exception {
+                KV<byte[], Iterable<Mutation>> record = ctx.element();
+                List<Mutation> mutations = new ArrayList<>();
+                for (Mutation mutation : record.getValue()) {
+                    mutations.add(mutation);
+                    ++recordsWritten;
+                }
+                mutator.mutate(mutations);
             }
 
             @FinishBundle
             public void finishBundle() throws Exception {
                 mutator.flush();
-                LOG.debug("Wrote {} records", recordsWritten);
             }
 
             @Teardown
@@ -649,6 +661,7 @@ public class HBaseIO {
                     connection.close();
                     connection = null;
                 }
+                LOG.debug("Wrote {} records", recordsWritten);
             }
 
             @Override
@@ -665,4 +678,7 @@ public class HBaseIO {
             private long recordsWritten;
         }
     }
+
+    public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER =
+            KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of()));
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index ee83114..501fe09 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,12 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.List;
 import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -69,42 +65,4 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
       throw new IllegalArgumentException("Only Put and Delete are supported");
     }
   }
-
-  /**
-   * Returns a {@link CoderProvider} which uses the {@link HBaseMutationCoder} for
-   * {@link Mutation mutations}.
-   */
-  static CoderProvider getCoderProvider() {
-    return HBASE_MUTATION_CODER_PROVIDER;
-  }
-
-  private static final CoderProvider HBASE_MUTATION_CODER_PROVIDER =
-    new HBaseMutationCoderProvider();
-
-  /**
-   * A {@link CoderProvider} for {@link Mutation mutations}.
-   */
-  private static class HBaseMutationCoderProvider extends CoderProvider {
-    @Override
-    public <T> Coder<T> coderFor(TypeDescriptor<T> typeDescriptor,
-        List<? extends Coder<?>> componentCoders) throws CannotProvideCoderException {
-      if (!typeDescriptor.isSubtypeOf(HBASE_MUTATION_TYPE_DESCRIPTOR)) {
-        throw new CannotProvideCoderException(
-          String.format(
-            "Cannot provide %s because %s is not a subclass of %s",
-            HBaseMutationCoder.class.getSimpleName(),
-            typeDescriptor,
-            Mutation.class.getName()));
-      }
-
-      try {
-        return (Coder<T>) HBaseMutationCoder.of();
-      } catch (IllegalArgumentException e) {
-        throw new CannotProvideCoderException(e);
-      }
-    }
-  }
-
-  private static final TypeDescriptor<Mutation> HBASE_MUTATION_TYPE_DESCRIPTOR =
-    new TypeDescriptor<Mutation>() {};
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
deleted file mode 100644
index 5b2e138..0000000
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrarTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hbase;
-
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link HBaseCoderProviderRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class HBaseCoderProviderRegistrarTest {
-  @Test
-  public void testResultCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getCoder(Result.class);
-  }
-
-  @Test
-  public void testMutationCoderIsRegistered() throws Exception {
-    CoderRegistry.createDefault().getCoder(Mutation.class);
-    CoderRegistry.createDefault().getCoder(Put.class);
-    CoderRegistry.createDefault().getCoder(Delete.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 806a27f..4a06789 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +47,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
@@ -96,12 +96,7 @@ public class HBaseIOTest {
         conf.setStrings("hbase.master.hostname", "localhost");
         conf.setStrings("hbase.regionserver.hostname", "localhost");
         htu = new HBaseTestingUtility(conf);
-
-        // We don't use the full htu.startMiniCluster() to avoid starting unneeded HDFS/MR daemons
-        htu.startMiniZKCluster();
-        MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 4);
-        hbm.waitForActiveAndReadyMaster();
-
+        htu.startMiniCluster(1, 4);
         admin = htu.getHBaseAdmin();
     }
 
@@ -112,8 +107,7 @@ public class HBaseIOTest {
             admin = null;
         }
         if (htu != null) {
-            htu.shutdownMiniHBaseCluster();
-            htu.shutdownMiniZKCluster();
+            htu.shutdownMiniCluster();
             htu = null;
         }
     }
@@ -291,16 +285,15 @@ public class HBaseIOTest {
         final String table = "table";
         final String key = "key";
         final String value = "value";
-        final int numMutations = 100;
 
         createTable(table);
 
-        p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)))
-         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply("single row", Create.of(makeWrite(key, value)).withCoder(HBaseIO.WRITE_CODER))
+                .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
         p.run().waitUntilFinish();
 
         List<Result> results = readTable(table, new Scan());
-        assertEquals(numMutations, results.size());
+        assertEquals(1, results.size());
     }
 
     /** Tests that when writing to a non-existent table, the write fails. */
@@ -308,8 +301,10 @@ public class HBaseIOTest {
     public void testWritingFailsTableDoesNotExist() throws Exception {
         final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-        p.apply(Create.empty(HBaseMutationCoder.of()))
-         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+        PCollection<KV<byte[], Iterable<Mutation>>> emptyInput =
+                p.apply(Create.empty(HBaseIO.WRITE_CODER));
+
+        emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         // Exception will be thrown by write.validate() when write is applied.
         thrown.expect(IllegalArgumentException.class);
@@ -324,8 +319,8 @@ public class HBaseIOTest {
         final String key = "KEY";
         createTable(table);
 
-        p.apply(Create.of(makeBadMutation(key)))
-         .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
+        p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER))
+                .apply(HBaseIO.write().withConfiguration(conf).withTableId(table));
 
         thrown.expect(Pipeline.PipelineExecutionException.class);
         thrown.expectCause(Matchers.<Throwable>instanceOf(IllegalArgumentException.class));
@@ -403,22 +398,26 @@ public class HBaseIOTest {
 
     // Beam helper methods
     /** Helper function to make a single row mutation to be written. */
-    private static Iterable<Mutation> makeMutations(String key, String value, int numMutations) {
+    private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) {
+        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
         List<Mutation> mutations = new ArrayList<>();
-        for (int i = 0; i < numMutations; i++) {
-            mutations.add(makeMutation(key + i, value));
-        }
-        return mutations;
+        mutations.add(makeMutation(key, value));
+        return KV.of(rowKey, (Iterable<Mutation>) mutations);
     }
 
+
     private static Mutation makeMutation(String key, String value) {
-        return new Put(key.getBytes(StandardCharsets.UTF_8))
+        byte[] rowKey = key.getBytes(StandardCharsets.UTF_8);
+        return new Put(rowKey)
                     .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value))
                     .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com"));
     }
 
-    private static Mutation makeBadMutation(String key) {
-        return new Put(key.getBytes());
+    private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) {
+        Put put = new Put(key.getBytes());
+        List<Mutation> mutations = new ArrayList<>();
+        mutations.add(put);
+        return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations);
     }
 
     private void runReadTest(HBaseIO.Read read, List<Result> expected) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/pom.xml b/sdks/java/io/hcatalog/pom.xml
deleted file mode 100644
index 2aa661e..0000000
--- a/sdks/java/io/hcatalog/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.beam</groupId>
-    <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>beam-sdks-java-io-hcatalog</artifactId>
-  <name>Apache Beam :: SDKs :: Java :: IO :: HCatalog</name>
-  <description>IO to read and write for HCatalog source.</description>
-
-  <properties>
-    <hive.version>2.1.0</hive.version>
-    <apache.commons.version>2.5</apache.commons.version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <configuration>
-          <createDependencyReducedPom>false</createDependencyReducedPom>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <exclusions>
-        <!-- Fix build on JDK-9 -->
-        <exclusion>
-          <groupId>jdk.tools</groupId>
-          <artifactId>jdk.tools</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>${apache.commons.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${hive.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.auto.value</groupId>
-      <artifactId>auto-value</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-hcatalog-core</artifactId>
-      <version>${hive.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hive</groupId>
-          <artifactId>hive-exec</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-        <!-- Fix build on JDK-9 -->
-        <exclusion>
-          <groupId>jdk.tools</groupId>
-          <artifactId>jdk.tools</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive.hcatalog</groupId>
-      <artifactId>hive-hcatalog-core</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-      <version>${hive.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-exec</artifactId>
-      <version>${hive.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-common</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hive</groupId>
-      <artifactId>hive-cli</artifactId>
-      <version>${hive.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-direct-java</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
deleted file mode 100644
index 4199b80..0000000
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hcatalog;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hadoop.WritableCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hive.hcatalog.common.HCatConstants;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.common.HCatUtil;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
-import org.apache.hive.hcatalog.data.transfer.HCatReader;
-import org.apache.hive.hcatalog.data.transfer.HCatWriter;
-import org.apache.hive.hcatalog.data.transfer.ReadEntity;
-import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.apache.hive.hcatalog.data.transfer.WriteEntity;
-import org.apache.hive.hcatalog.data.transfer.WriterContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * IO to read and write data using HCatalog.
- *
- * <h3>Reading using HCatalog</h3>
- *
- * <p>HCatalog source supports reading of HCatRecord from a HCatalog managed source, for eg. Hive.
- *
- * <p>To configure a HCatalog source, you must specify a metastore URI and a table name. Other
- * optional parameters are database &amp; filter For instance:
- *
- * <pre>{@code
- * Map<String, String> configProperties = new HashMap<String, String>();
- * configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
- *
- * pipeline
- *   .apply(HCatalogIO.read()
- *       .withConfigProperties(configProperties)
- *       .withDatabase("default") //optional, assumes default if none specified
- *       .withTable("employee")
- *       .withFilter(filterString) //optional, may be specified if the table is partitioned
- * }</pre>
- *
- * <h3>Writing using HCatalog</h3>
- *
- * <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive.
- *
- * <p>To configure a HCatalog sink, you must specify a metastore URI and a table name. Other
- * optional parameters are database, partition &amp; batchsize The destination table should exist
- * beforehand, the transform does not create a new table if it does not exist For instance:
- *
- * <pre>{@code
- * Map<String, String> configProperties = new HashMap<String, String>();
- * configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
- *
- * pipeline
- *   .apply(...)
- *   .apply(HiveIO.write()
- *       .withConfigProperties(configProperties)
- *       .withDatabase("default") //optional, assumes default if none specified
- *       .withTable("employee")
- *       .withPartition(partitionValues) //optional, may be specified if the table is partitioned
- *       .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
- * }</pre>
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class HCatalogIO {
-
-  private static final Logger LOG = LoggerFactory.getLogger(HCatalogIO.class);
-
-  private static final long BATCH_SIZE = 1024L;
-  private static final String DEFAULT_DATABASE = "default";
-
-  /** Write data to Hive. */
-  public static Write write() {
-    return new AutoValue_HCatalogIO_Write.Builder().setBatchSize(BATCH_SIZE).build();
-  }
-
-  /** Read data from Hive. */
-  public static Read read() {
-    return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build();
-  }
-
-  private HCatalogIO() {}
-
-  /** A {@link PTransform} to read data using HCatalog. */
-  @VisibleForTesting
-  @AutoValue
-  public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> {
-    @Nullable abstract Map<String, String> getConfigProperties();
-    @Nullable abstract String getDatabase();
-    @Nullable abstract String getTable();
-    @Nullable abstract String getFilter();
-    @Nullable abstract ReaderContext getContext();
-    @Nullable abstract Integer getSplitId();
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setConfigProperties(Map<String, String> configProperties);
-      abstract Builder setDatabase(String database);
-      abstract Builder setTable(String table);
-      abstract Builder setFilter(String filter);
-      abstract Builder setSplitId(Integer splitId);
-      abstract Builder setContext(ReaderContext context);
-      abstract Read build();
-    }
-
-    /** Sets the configuration properties like metastore URI. */
-    public Read withConfigProperties(Map<String, String> configProperties) {
-      return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
-    }
-
-    /** Sets the database name. This is optional, assumes 'default' database if none specified */
-    public Read withDatabase(String database) {
-      return toBuilder().setDatabase(database).build();
-    }
-
-    /** Sets the table name to read from. */
-    public Read withTable(String table) {
-      return toBuilder().setTable(table).build();
-    }
-
-    /** Sets the filter details. This is optional, assumes none if not specified */
-    public Read withFilter(String filter) {
-      return toBuilder().setFilter(filter).build();
-    }
-
-    Read withSplitId(int splitId) {
-      checkArgument(splitId >= 0, "Invalid split id-" + splitId);
-      return toBuilder().setSplitId(splitId).build();
-    }
-
-    Read withContext(ReaderContext context) {
-      return toBuilder().setContext(context).build();
-    }
-
-    @Override
-    public PCollection<HCatRecord> expand(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getTable(), "table");
-      checkNotNull(getConfigProperties(), "configProperties");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("configProperties", getConfigProperties().toString()));
-      builder.add(DisplayData.item("table", getTable()));
-      builder.addIfNotNull(DisplayData.item("database", getDatabase()));
-      builder.addIfNotNull(DisplayData.item("filter", getFilter()));
-    }
-  }
-
-  /** A HCatalog {@link BoundedSource} reading {@link HCatRecord} from a given instance. */
-  @VisibleForTesting
-  static class BoundedHCatalogSource extends BoundedSource<HCatRecord> {
-    private final Read spec;
-
-    BoundedHCatalogSource(Read spec) {
-      this.spec = spec;
-    }
-
-    @Override
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    public Coder<HCatRecord> getDefaultOutputCoder() {
-      return (Coder) WritableCoder.of(DefaultHCatRecord.class);
-    }
-
-    @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      spec.populateDisplayData(builder);
-    }
-
-    @Override
-    public BoundedReader<HCatRecord> createReader(PipelineOptions options) {
-      return new BoundedHCatalogReader(this);
-    }
-
-    /**
-     * Returns the size of the table in bytes, does not take into consideration filter/partition
-     * details passed, if any.
-     */
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
-      Configuration conf = new Configuration();
-      for (Entry<String, String> entry : spec.getConfigProperties().entrySet()) {
-        conf.set(entry.getKey(), entry.getValue());
-      }
-      IMetaStoreClient client = null;
-      try {
-        HiveConf hiveConf = HCatUtil.getHiveConf(conf);
-        client = HCatUtil.getHiveMetastoreClient(hiveConf);
-        Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable());
-        return StatsUtils.getFileSizeForTable(hiveConf, table);
-      } finally {
-        // IMetaStoreClient is not AutoCloseable, closing it manually
-        if (client != null) {
-          client.close();
-        }
-      }
-    }
-
-    /**
-     * Calculates the 'desired' number of splits based on desiredBundleSizeBytes which is passed as
-     * a hint to native API. Retrieves the actual splits generated by native API, which could be
-     * different from the 'desired' split count calculated using desiredBundleSizeBytes
-     */
-    @Override
-    public List<BoundedSource<HCatRecord>> split(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      int desiredSplitCount = 1;
-      long estimatedSizeBytes = getEstimatedSizeBytes(options);
-      if (desiredBundleSizeBytes > 0 && estimatedSizeBytes > 0) {
-        desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / desiredBundleSizeBytes);
-      }
-      ReaderContext readerContext = getReaderContext(desiredSplitCount);
-      //process the splits returned by native API
-      //this could be different from 'desiredSplitCount' calculated above
-      LOG.info(
-          "Splitting into bundles of {} bytes: "
-              + "estimated size {}, desired split count {}, actual split count {}",
-          desiredBundleSizeBytes,
-          estimatedSizeBytes,
-          desiredSplitCount,
-          readerContext.numSplits());
-
-      List<BoundedSource<HCatRecord>> res = new ArrayList<>();
-      for (int split = 0; split < readerContext.numSplits(); split++) {
-        res.add(new BoundedHCatalogSource(spec.withContext(readerContext).withSplitId(split)));
-      }
-      return res;
-    }
-
-    private ReaderContext getReaderContext(long desiredSplitCount) throws HCatException {
-      ReadEntity entity =
-          new ReadEntity.Builder()
-              .withDatabase(spec.getDatabase())
-              .withTable(spec.getTable())
-              .withFilter(spec.getFilter())
-              .build();
-      // pass the 'desired' split count as an hint to the API
-      Map<String, String> configProps = new HashMap<>(spec.getConfigProperties());
-      configProps.put(
-          HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount));
-      return DataTransferFactory.getHCatReader(entity, configProps).prepareRead();
-    }
-
-    static class BoundedHCatalogReader extends BoundedSource.BoundedReader<HCatRecord> {
-      private final BoundedHCatalogSource source;
-      private HCatRecord current;
-      private Iterator<HCatRecord> hcatIterator;
-
-      public BoundedHCatalogReader(BoundedHCatalogSource source) {
-        this.source = source;
-      }
-
-      @Override
-      public boolean start() throws HCatException {
-        HCatReader reader =
-            DataTransferFactory.getHCatReader(source.spec.getContext(), source.spec.getSplitId());
-        hcatIterator = reader.read();
-        return advance();
-      }
-
-      @Override
-      public boolean advance() {
-        if (hcatIterator.hasNext()) {
-          current = hcatIterator.next();
-          return true;
-        } else {
-          current = null;
-          return false;
-        }
-      }
-
-      @Override
-      public BoundedHCatalogSource getCurrentSource() {
-        return source;
-      }
-
-      @Override
-      public HCatRecord getCurrent() {
-        if (current == null) {
-          throw new NoSuchElementException("Current element is null");
-        }
-        return current;
-      }
-
-      @Override
-      public void close() {
-        // nothing to close/release
-      }
-    }
-  }
-
-  /** A {@link PTransform} to write to a HCatalog managed source. */
-  @AutoValue
-  public abstract static class Write extends PTransform<PCollection<HCatRecord>, PDone> {
-    @Nullable abstract Map<String, String> getConfigProperties();
-    @Nullable abstract String getDatabase();
-    @Nullable abstract String getTable();
-    @Nullable abstract Map<String, String> getPartition();
-    abstract long getBatchSize();
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setConfigProperties(Map<String, String> configProperties);
-      abstract Builder setDatabase(String database);
-      abstract Builder setTable(String table);
-      abstract Builder setPartition(Map<String, String> partition);
-      abstract Builder setBatchSize(long batchSize);
-      abstract Write build();
-    }
-
-    /** Sets the configuration properties like metastore URI. */
-    public Write withConfigProperties(Map<String, String> configProperties) {
-      return toBuilder().setConfigProperties(new HashMap<>(configProperties)).build();
-    }
-
-    /** Sets the database name. This is optional, assumes 'default' database if none specified */
-    public Write withDatabase(String database) {
-      return toBuilder().setDatabase(database).build();
-    }
-
-    /** Sets the table name to write to, the table should exist beforehand. */
-    public Write withTable(String table) {
-      return toBuilder().setTable(table).build();
-    }
-
-    /** Sets the partition details. */
-    public Write withPartition(Map<String, String> partition) {
-      return toBuilder().setPartition(partition).build();
-    }
-
-    /**
-     * Sets batch size for the write operation. This is optional, assumes a default batch size of
-     * 1024 if not set
-     */
-    public Write withBatchSize(long batchSize) {
-      return toBuilder().setBatchSize(batchSize).build();
-    }
-
-    @Override
-    public PDone expand(PCollection<HCatRecord> input) {
-      input.apply(ParDo.of(new WriteFn(this)));
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getConfigProperties(), "configProperties");
-      checkNotNull(getTable(), "table");
-    }
-
-    private static class WriteFn extends DoFn<HCatRecord, Void> {
-      private final Write spec;
-      private WriterContext writerContext;
-      private HCatWriter slaveWriter;
-      private HCatWriter masterWriter;
-      private List<HCatRecord> hCatRecordsBatch;
-
-      public WriteFn(Write spec) {
-        this.spec = spec;
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder.addIfNotNull(DisplayData.item("database", spec.getDatabase()));
-        builder.add(DisplayData.item("table", spec.getTable()));
-        builder.addIfNotNull(DisplayData.item("partition", String.valueOf(spec.getPartition())));
-        builder.add(DisplayData.item("configProperties", spec.getConfigProperties().toString()));
-        builder.add(DisplayData.item("batchSize", spec.getBatchSize()));
-      }
-
-      @Setup
-      public void initiateWrite() throws HCatException {
-        WriteEntity entity =
-            new WriteEntity.Builder()
-                .withDatabase(spec.getDatabase())
-                .withTable(spec.getTable())
-                .withPartition(spec.getPartition())
-                .build();
-        masterWriter = DataTransferFactory.getHCatWriter(entity, spec.getConfigProperties());
-        writerContext = masterWriter.prepareWrite();
-        slaveWriter = DataTransferFactory.getHCatWriter(writerContext);
-      }
-
-      @StartBundle
-      public void startBundle() {
-        hCatRecordsBatch = new ArrayList<>();
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext ctx) throws HCatException {
-        hCatRecordsBatch.add(ctx.element());
-        if (hCatRecordsBatch.size() >= spec.getBatchSize()) {
-          flush();
-        }
-      }
-
-      @FinishBundle
-      public void finishBundle() throws HCatException {
-        flush();
-      }
-
-      private void flush() throws HCatException {
-        if (hCatRecordsBatch.isEmpty()) {
-          return;
-        }
-        try {
-          slaveWriter.write(hCatRecordsBatch.iterator());
-          masterWriter.commit(writerContext);
-        } catch (HCatException e) {
-          LOG.error("Exception in flush - write/commit data to Hive", e);
-          //abort on exception
-          masterWriter.abort(writerContext);
-          throw e;
-        } finally {
-          hCatRecordsBatch.clear();
-        }
-      }
-
-      @Teardown
-      public void tearDown() throws Exception {
-        if (slaveWriter != null) {
-          slaveWriter = null;
-        }
-        if (masterWriter != null) {
-          masterWriter = null;
-        }
-        if (writerContext != null) {
-          writerContext = null;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
deleted file mode 100644
index dff5bd1..0000000
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Transforms for reading and writing using HCatalog.
- */
-package org.apache.beam.sdk.io.hcatalog;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
deleted file mode 100644
index 31e5b1c..0000000
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/EmbeddedMetastoreService.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hcatalog;
-
-import static org.apache.hive.hcatalog.common.HCatUtil.makePathASafeFileName;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-
-/**
- * Implementation of a light-weight embedded metastore. This class is a trimmed-down version of <a
- * href="https://github.com/apache/hive/blob/master
- * /hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatBaseTest.java">
- * https://github.com/apache/hive/blob/master/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce
- * /HCatBaseTest.java </a>
- */
-final class EmbeddedMetastoreService implements AutoCloseable {
-  private final Driver driver;
-  private final HiveConf hiveConf;
-  private final SessionState sessionState;
-
-  EmbeddedMetastoreService(String baseDirPath) throws IOException {
-    FileUtils.forceDeleteOnExit(new File(baseDirPath));
-
-    String hiveDirPath = makePathASafeFileName(baseDirPath + "/hive");
-    String testDataDirPath =
-        makePathASafeFileName(
-            hiveDirPath
-                + "/data/"
-                + EmbeddedMetastoreService.class.getCanonicalName()
-                + System.currentTimeMillis());
-    String testWarehouseDirPath = makePathASafeFileName(testDataDirPath + "/warehouse");
-
-    hiveConf = new HiveConf(getClass());
-    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
-    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, testWarehouseDirPath);
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, true);
-    hiveConf.setVar(
-        HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd."
-            + "SQLStdHiveAuthorizerFactory");
-    hiveConf.set("test.tmp.dir", hiveDirPath);
-
-    System.setProperty("derby.stream.error.file", "/dev/null");
-    driver = new Driver(hiveConf);
-    sessionState = SessionState.start(new CliSessionState(hiveConf));
-  }
-
-  /** Executes the passed query on the embedded metastore service. */
-  void executeQuery(String query) throws CommandNeedRetryException {
-    driver.run(query);
-  }
-
-  /** Returns the HiveConf object for the embedded metastore. */
-  HiveConf getHiveConf() {
-    return hiveConf;
-  }
-
-  @Override
-  public void close() throws Exception {
-    driver.close();
-    sessionState.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
deleted file mode 100644
index 91671a5..0000000
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hcatalog;
-
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_DATABASE;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_FILTER;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.TEST_TABLE;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getConfigPropertiesAsMap;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getExpectedRecords;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getHCatRecords;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.getReaderContext;
-import static org.apache.beam.sdk.io.hcatalog.HCatalogIOTestUtils.insertTestData;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.hcatalog.HCatalogIO.BoundedHCatalogSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestRule;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-import org.junit.runners.model.Statement;
-
-/** Test for HCatalogIO. */
-public class HCatalogIOTest implements Serializable {
-  private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create();
-
-  @ClassRule
-  public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
-
-  @Rule public final transient TestPipeline defaultPipeline = TestPipeline.create();
-
-  @Rule public final transient TestPipeline readAfterWritePipeline = TestPipeline.create();
-
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-
-  @Rule
-  public final transient TestRule testDataSetupRule =
-      new TestWatcher() {
-        public Statement apply(final Statement base, final Description description) {
-          return new Statement() {
-            @Override
-            public void evaluate() throws Throwable {
-              if (description.getAnnotation(NeedsTestData.class) != null) {
-                prepareTestData();
-              } else if (description.getAnnotation(NeedsEmptyTestTables.class) != null) {
-                reCreateTestTable();
-              }
-              base.evaluate();
-            }
-          };
-        }
-      };
-
-  private static EmbeddedMetastoreService service;
-
-  /** Use this annotation to setup complete test data(table populated with records). */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ElementType.METHOD})
-  private @interface NeedsTestData {}
-
-  /** Use this annotation to setup test tables alone(empty tables, no records are populated). */
-  @Retention(RetentionPolicy.RUNTIME)
-  @Target({ElementType.METHOD})
-  private @interface NeedsEmptyTestTables {}
-
-  @BeforeClass
-  public static void setupEmbeddedMetastoreService () throws IOException {
-    service = new EmbeddedMetastoreService(TMP_FOLDER.getRoot().getAbsolutePath());
-  }
-
-  @AfterClass
-  public static void shutdownEmbeddedMetastoreService () throws Exception {
-    service.executeQuery("drop table " + TEST_TABLE);
-    service.close();
-  }
-
-  /** Perform end-to-end test of Write-then-Read operation. */
-  @Test
-  @NeedsEmptyTestTables
-  public void testWriteThenReadSuccess() throws Exception {
-    defaultPipeline
-        .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT)))
-        .apply(
-            HCatalogIO.write()
-                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-                .withDatabase(TEST_DATABASE)
-                .withTable(TEST_TABLE)
-                .withPartition(new java.util.HashMap<String, String>())
-                .withBatchSize(512L));
-    defaultPipeline.run();
-
-    PCollection<String> output = readAfterWritePipeline
-        .apply(
-            HCatalogIO.read()
-                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-                .withDatabase(TEST_DATABASE)
-                .withTable(TEST_TABLE)
-                .withFilter(TEST_FILTER))
-        .apply(
-            ParDo.of(
-                new DoFn<HCatRecord, String>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext c) {
-                    c.output(c.element().get(0).toString());
-                  }
-                }));
-    PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT));
-    readAfterWritePipeline.run();
-  }
-
-  /** Test of Write to a non-existent table. */
-  @Test
-  public void testWriteFailureTableDoesNotExist() throws Exception {
-    thrown.expectCause(isA(UserCodeException.class));
-    thrown.expectMessage(containsString("org.apache.hive.hcatalog.common.HCatException"));
-    thrown.expectMessage(containsString("NoSuchObjectException"));
-    defaultPipeline
-        .apply(Create.of(getHCatRecords(TEST_RECORDS_COUNT)))
-        .apply(
-            HCatalogIO.write()
-                .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-                .withTable("myowntable"));
-    defaultPipeline.run();
-  }
-
-  /** Test of Write without specifying a table. */
-  @Test
-  public void testWriteFailureValidationTable() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("table"));
-    HCatalogIO.write()
-        .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-        .validate(null);
-  }
-
-  /** Test of Write without specifying configuration properties. */
-  @Test
-  public void testWriteFailureValidationConfigProp() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("configProperties"));
-    HCatalogIO.write().withTable("myowntable").validate(null);
-  }
-
-  /** Test of Read from a non-existent table. */
-  @Test
-  public void testReadFailureTableDoesNotExist() throws Exception {
-    defaultPipeline.apply(
-        HCatalogIO.read()
-            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-            .withTable("myowntable"));
-    thrown.expectCause(isA(NoSuchObjectException.class));
-    defaultPipeline.run();
-  }
-
-  /** Test of Read without specifying configuration properties. */
-  @Test
-  public void testReadFailureValidationConfig() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("configProperties"));
-    HCatalogIO.read().withTable("myowntable").validate(null);
-  }
-
-  /** Test of Read without specifying a table. */
-  @Test
-  public void testReadFailureValidationTable() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("table"));
-    HCatalogIO.read()
-        .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-        .validate(null);
-  }
-
-  /** Test of Read using SourceTestUtils.readFromSource(..). */
-  @Test
-  @NeedsTestData
-  public void testReadFromSource() throws Exception {
-    ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
-    HCatalogIO.Read spec =
-        HCatalogIO.read()
-            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-            .withContext(context)
-            .withTable(TEST_TABLE);
-
-    List<String> records = new ArrayList<>();
-    for (int i = 0; i < context.numSplits(); i++) {
-      BoundedHCatalogSource source = new BoundedHCatalogSource(spec.withSplitId(i));
-      for (HCatRecord record : SourceTestUtils.readFromSource(source, OPTIONS)) {
-        records.add(record.get(0).toString());
-      }
-    }
-    assertThat(records, containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT).toArray()));
-  }
-
-  /** Test of Read using SourceTestUtils.assertSourcesEqualReferenceSource(..). */
-  @Test
-  @NeedsTestData
-  public void testSourceEqualsSplits() throws Exception {
-    final int numRows = 1500;
-    final int numSamples = 10;
-    final long bytesPerRow = 15;
-    ReaderContext context = getReaderContext(getConfigPropertiesAsMap(service.getHiveConf()));
-    HCatalogIO.Read spec =
-        HCatalogIO.read()
-            .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-            .withContext(context)
-            .withTable(TEST_TABLE);
-
-    BoundedHCatalogSource source = new BoundedHCatalogSource(spec);
-    List<BoundedSource<HCatRecord>> unSplitSource = source.split(-1, OPTIONS);
-    assertEquals(1, unSplitSource.size());
-
-    List<BoundedSource<HCatRecord>> splits =
-        source.split(numRows * bytesPerRow / numSamples, OPTIONS);
-    assertTrue(splits.size() >= 1);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(unSplitSource.get(0), splits, OPTIONS);
-  }
-
-  private void reCreateTestTable() throws CommandNeedRetryException {
-    service.executeQuery("drop table " + TEST_TABLE);
-    service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)");
-  }
-
-  private void prepareTestData() throws Exception {
-    reCreateTestTable();
-    insertTestData(getConfigPropertiesAsMap(service.getHiveConf()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
deleted file mode 100644
index ae1eb50..0000000
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTestUtils.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.hcatalog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.DefaultHCatRecord;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
-import org.apache.hive.hcatalog.data.transfer.ReadEntity;
-import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.apache.hive.hcatalog.data.transfer.WriteEntity;
-import org.apache.hive.hcatalog.data.transfer.WriterContext;
-
-/** Utility class for HCatalogIOTest. */
-class HCatalogIOTestUtils {
-  static final String TEST_DATABASE = "default";
-  static final String TEST_TABLE = "mytable";
-  static final String TEST_FILTER = "myfilter";
-  static final int TEST_RECORDS_COUNT = 1000;
-
-  private static final ReadEntity READ_ENTITY =
-      new ReadEntity.Builder().withTable(TEST_TABLE).build();
-  private static final WriteEntity WRITE_ENTITY =
-      new WriteEntity.Builder().withTable(TEST_TABLE).build();
-
-  /** Returns a ReaderContext instance for the passed datastore config params. */
-  static ReaderContext getReaderContext(Map<String, String> config) throws HCatException {
-    return DataTransferFactory.getHCatReader(READ_ENTITY, config).prepareRead();
-  }
-
-  /** Returns a WriterContext instance for the passed datastore config params. */
-  private static WriterContext getWriterContext(Map<String, String> config) throws HCatException {
-    return DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).prepareWrite();
-  }
-
-  /** Writes records to the table using the passed WriterContext. */
-  private static void writeRecords(WriterContext context) throws HCatException {
-    DataTransferFactory.getHCatWriter(context).write(getHCatRecords(TEST_RECORDS_COUNT).iterator());
-  }
-
-  /** Commits the pending writes to the database. */
-  private static void commitRecords(Map<String, String> config, WriterContext context)
-      throws IOException {
-    DataTransferFactory.getHCatWriter(WRITE_ENTITY, config).commit(context);
-  }
-
-  /** Returns a list of strings containing 'expected' test data for verification. */
-  static List<String> getExpectedRecords(int count) {
-    List<String> expected = new ArrayList<>();
-    for (int i = 0; i < count; i++) {
-      expected.add("record " + i);
-    }
-    return expected;
-  }
-
-  /** Returns a list of HCatRecords of passed size. */
-  static List<HCatRecord> getHCatRecords(int size) {
-    List<HCatRecord> expected = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      expected.add(toHCatRecord(i));
-    }
-    return expected;
-  }
-
-  /** Inserts data into test datastore. */
-  static void insertTestData(Map<String, String> configMap) throws Exception {
-    WriterContext cntxt = getWriterContext(configMap);
-    writeRecords(cntxt);
-    commitRecords(configMap, cntxt);
-  }
-
-  /** Returns config params for the test datastore as a Map. */
-  static Map<String, String> getConfigPropertiesAsMap(HiveConf hiveConf) {
-    Map<String, String> map = new HashMap<>();
-    for (Entry<String, String> kv : hiveConf) {
-      map.put(kv.getKey(), kv.getValue());
-    }
-    return map;
-  }
-
-  /** returns a DefaultHCatRecord instance for passed value. */
-  private static DefaultHCatRecord toHCatRecord(int value) {
-    return new DefaultHCatRecord(Arrays.<Object>asList("record " + value, value));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/resources/hive-site.xml b/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
deleted file mode 100644
index 5bb1496..0000000
--- a/sdks/java/io/hcatalog/src/test/resources/hive-site.xml
+++ /dev/null
@@ -1,301 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-<!-- This file is a copy of https://github.com/apache/hive/blob/master/data/conf/hive-site.xml used to support embedded Hive metastore-->
-<configuration>
-
-<property>
-  <name>hive.in.test</name>
-  <value>true</value>
-  <description>Internal marker for test. Used for masking env-dependent values</description>
-</property>
-
-<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files  -->
-<!-- that are implied by Hadoop setup variables.                                                -->
-<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive    -->
-<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
-<!-- resource).                                                                                 -->
-
-<!-- Hive Execution Parameters -->
-<property>
-  <name>hadoop.tmp.dir</name>
-  <value>${test.tmp.dir}/hadoop-tmp</value>
-  <description>A base for other temporary directories.</description>
-</property>
-
-<!--
-<property>
-  <name>hive.exec.reducers.max</name>
-  <value>1</value>
-  <description>maximum number of reducers</description>
-</property>
--->
-
-<property>
-  <name>hive.exec.scratchdir</name>
-  <value>${test.tmp.dir}/scratchdir</value>
-  <description>Scratch space for Hive jobs</description>
-</property>
-
-<property>
-  <name>hive.exec.local.scratchdir</name>
-  <value>${test.tmp.dir}/localscratchdir/</value>
-  <description>Local scratch space for Hive jobs</description>
-</property>
-
-<property>
-  <name>datanucleus.schema.autoCreateAll</name>
-  <value>true</value>
-</property>
-
-<property>
-  <name>javax.jdo.option.ConnectionURL</name>
-  <value>jdbc:derby:;databaseName=${test.tmp.dir}/junit_metastore_db;create=true</value>
-</property>
-
-<property>
-  <name>javax.jdo.option.ConnectionDriverName</name>
-  <value>org.apache.derby.jdbc.EmbeddedDriver</value>
-</property>
-
-<property>
-  <name>javax.jdo.option.ConnectionUserName</name>
-  <value>APP</value>
-</property>
-
-<property>
-  <name>javax.jdo.option.ConnectionPassword</name>
-  <value>mine</value>
-</property>
-
-<property>
-  <!--  this should eventually be deprecated since the metastore should supply this -->
-  <name>hive.metastore.warehouse.dir</name>
-  <value>${test.warehouse.dir}</value>
-  <description></description>
-</property>
-
-<property>
-  <name>hive.metastore.metadb.dir</name>
-  <value>file://${test.tmp.dir}/metadb/</value>
-  <description>
-  Required by metastore server or if the uris argument below is not supplied
-  </description>
-</property>
-
-<property>
-  <name>test.log.dir</name>
-  <value>${test.tmp.dir}/log/</value>
-  <description></description>
-</property>
-
-<property>
-  <name>test.data.files</name>
-  <value>${hive.root}/data/files</value>
-  <description></description>
-</property>
-
-<property>
-  <name>test.data.scripts</name>
-  <value>${hive.root}/data/scripts</value>
-  <description></description>
-</property>
-
-<property>
-  <name>hive.jar.path</name>
-  <value>${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar</value>
-  <description></description>
-</property>
-
-<property>
-  <name>hive.metastore.rawstore.impl</name>
-  <value>org.apache.hadoop.hive.metastore.ObjectStore</value>
-  <description>Name of the class that implements org.apache.hadoop.hive.metastore.rawstore interface. This class is used to store and retrieval of raw metadata objects such as table, database</description>
-</property>
-
-<property>
-  <name>hive.querylog.location</name>
-  <value>${test.tmp.dir}/tmp</value>
-  <description>Location of the structured hive logs</description>
-</property>
-
-<property>
-  <name>hive.exec.pre.hooks</name>
-  <value>org.apache.hadoop.hive.ql.hooks.PreExecutePrinter, org.apache.hadoop.hive.ql.hooks.EnforceReadOnlyTables</value>
-  <description>Pre Execute Hook for Tests</description>
-</property>
-
-<property>
-  <name>hive.exec.post.hooks</name>
-  <value>org.apache.hadoop.hive.ql.hooks.PostExecutePrinter</value>
-  <description>Post Execute Hook for Tests</description>
-</property>
-
-<property>
-  <name>hive.support.concurrency</name>
-  <value>true</value>
-  <description>Whether hive supports concurrency or not. A zookeeper instance must be up and running for the default hive lock manager to support read-write locks.</description>
-</property>
-
-<property>
-  <key>hive.unlock.numretries</key>
-  <value>2</value>
-  <description>The number of times you want to retry to do one unlock</description>
-</property>
-
-<property>
-  <key>hive.lock.sleep.between.retries</key>
-  <value>2</value>
-  <description>The sleep time (in seconds) between various retries</description>
-</property>
-
-
-<property>
-  <name>fs.pfile.impl</name>
-  <value>org.apache.hadoop.fs.ProxyLocalFileSystem</value>
-  <description>A proxy for local file system used for cross file system testing</description>
-</property>
-
-<property>
-  <name>hive.exec.mode.local.auto</name>
-  <value>false</value>
-  <description>
-    Let hive determine whether to run in local mode automatically
-    Disabling this for tests so that minimr is not affected
-  </description>
-</property>
-
-<property>
-  <name>hive.auto.convert.join</name>
-  <value>false</value>
-  <description>Whether Hive enable the optimization about converting common join into mapjoin based on the input file size</description>
-</property>
-
-<property>
-  <name>hive.ignore.mapjoin.hint</name>
-  <value>false</value>
-  <description>Whether Hive ignores the mapjoin hint</description>
-</property>
-
-<property>
-  <name>hive.input.format</name>
-  <value>org.apache.hadoop.hive.ql.io.CombineHiveInputFormat</value>
-  <description>The default input format, if it is not specified, the system assigns it. It is set to HiveInputFormat for hadoop versions 17, 18 and 19, whereas it is set to CombineHiveInputFormat for hadoop 20. The user can always overwrite it - if there is a bug in CombineHiveInputFormat, it can always be manually set to HiveInputFormat. </description>
-</property>
-
-<property>
-  <name>hive.default.rcfile.serde</name>
-  <value>org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe</value>
-  <description>The default SerDe hive will use for the rcfile format</description>
-</property>
-
-<property>
-  <name>hive.stats.key.prefix.reserve.length</name>
-  <value>0</value>
-</property>
-
-<property>
-  <name>hive.conf.restricted.list</name>
-  <value>dummy.config.value</value>
-  <description>Using dummy config value above because you cannot override config with empty value</description>
-</property>
-
-<property>
-  <name>hive.exec.submit.local.task.via.child</name>
-  <value>false</value>
-</property>
-
-
-<property>
-  <name>hive.dummyparam.test.server.specific.config.override</name>
-  <value>from.hive-site.xml</value>
-  <description>Using dummy param to test server specific configuration</description>
-</property>
-
-<property>
-  <name>hive.dummyparam.test.server.specific.config.hivesite</name>
-  <value>from.hive-site.xml</value>
-  <description>Using dummy param to test server specific configuration</description>
-</property>
-
-<property>
-  <name>test.var.hiveconf.property</name>
-  <value>${hive.exec.default.partition.name}</value>
-  <description>Test hiveconf property substitution</description>
-</property>
-
-<property>
-  <name>test.property1</name>
-  <value>value1</value>
-  <description>Test property defined in hive-site.xml only</description>
-</property>
-
-<property>
-  <name>hive.test.dummystats.aggregator</name>
-  <value>value2</value>
-</property>
-
-<property>
-  <name>hive.fetch.task.conversion</name>
-  <value>minimal</value>
-</property>
-
-<property>
-  <name>hive.users.in.admin.role</name>
-  <value>hive_admin_user</value>
-</property>
-
-<property>
-  <name>hive.llap.io.cache.orc.size</name>
-  <value>8388608</value>
-</property>
-
-<property>
-  <name>hive.llap.io.cache.orc.arena.size</name>
-  <value>8388608</value>
-</property>
-
-<property>
-  <name>hive.llap.io.cache.orc.alloc.max</name>
-  <value>2097152</value>
-</property>
-
-
-<property>
-  <name>hive.llap.io.cache.orc.alloc.min</name>
-  <value>32768</value>
-</property>
-
-<property>
-  <name>hive.llap.cache.allow.synthetic.fileid</name>
-  <value>true</value>
-</property>
-
-<property>
-  <name>hive.llap.io.use.lrfu</name>
-  <value>true</value>
-</property>
-
-
-<property>
-  <name>hive.llap.io.allocator.direct</name>
-  <value>false</value>
-</property>
-
-
-</configuration>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 050fc6a..17c26a0 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -49,11 +49,13 @@
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-streaming_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
         </dependency>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.10</artifactId>
+          <version>${spark.version}</version>
           <scope>runtime</scope>
           <exclusions>
             <exclusion>

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index bf73dbe..8092da6 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -133,7 +133,7 @@ import org.apache.commons.dbcp2.BasicDataSource;
  * Consider using <a href="https://en.wikipedia.org/wiki/Merge_(SQL)">MERGE ("upsert")
  * statements</a> supported by your database instead.
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class JdbcIO {
   /**
    * Read data from a JDBC datasource.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/jms/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/pom.xml b/sdks/java/io/jms/pom.xml
index c2074af..58009a1 100644
--- a/sdks/java/io/jms/pom.xml
+++ b/sdks/java/io/jms/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index f8cba5e..c5e5150 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -98,7 +98,7 @@ import org.slf4j.LoggerFactory;
  *
  * }</pre>
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class JmsIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 1256c46..29350cc 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-sdks-java-io-parent</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 026313a..4d2a358 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,11 +49,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -237,7 +235,7 @@ import org.slf4j.LoggerFactory;
  * Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any,
  * otherwise it is set to processing time.
  */
-@Experimental(Experimental.Kind.SOURCE_SINK)
+@Experimental
 public class KafkaIO {
 
   /**
@@ -906,22 +904,6 @@ public class KafkaIO {
       return name;
     }
 
-    // Maintains approximate average over last 1000 elements
-    private static class MovingAvg {
-      private static final int MOVING_AVG_WINDOW = 1000;
-      private double avg = 0;
-      private long numUpdates = 0;
-
-      void update(double quantity) {
-        numUpdates++;
-        avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
-      }
-
-      double get() {
-        return avg;
-      }
-    }
-
     // maintains state of each assigned partition (buffered records, consumed offset, etc)
     private static class PartitionState {
       private final TopicPartition topicPartition;
@@ -929,8 +911,9 @@ public class KafkaIO {
       private long latestOffset;
       private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
 
-      private MovingAvg avgRecordSize = new MovingAvg();
-      private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled.
+      // simple moving average for size of each record in bytes
+      private double avgRecordSize = 0;
+      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
 
       PartitionState(TopicPartition partition, long nextOffset) {
         this.topicPartition = partition;
@@ -938,13 +921,17 @@ public class KafkaIO {
         this.latestOffset = UNINITIALIZED_OFFSET;
       }
 
-      // Update consumedOffset, avgRecordSize, and avgOffsetGap
-      void recordConsumed(long offset, int size, long offsetGap) {
+      // update consumedOffset and avgRecordSize
+      void recordConsumed(long offset, int size) {
         nextOffset = offset + 1;
 
-        // This is always updated from single thread. Probably not worth making atomic.
-        avgRecordSize.update(size);
-        avgOffsetGap.update(offsetGap);
+        // this is always updated from single thread. probably not worth making it an AtomicDouble
+        if (avgRecordSize <= 0) {
+          avgRecordSize = size;
+        } else {
+          // initially, first record heavily contributes to average.
+          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
+        }
       }
 
       synchronized void setLatestOffset(long latestOffset) {
@@ -957,15 +944,14 @@ public class KafkaIO {
         if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return (long) (backlogMessageCount * avgRecordSize.get());
+        return (long) (backlogMessageCount * avgRecordSize);
       }
 
       synchronized long backlogMessageCount() {
         if (latestOffset < 0 || nextOffset < 0) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get());
-        return Math.max(0, (long) Math.ceil(remaining));
+        return Math.max(0, (latestOffset - nextOffset));
       }
     }
 
@@ -1063,32 +1049,8 @@ public class KafkaIO {
       curBatch = Iterators.cycle(nonEmpty);
     }
 
-    private void setupInitialOffset(PartitionState pState) {
-      Read<K, V> spec = source.spec;
-
-      if (pState.nextOffset != UNINITIALIZED_OFFSET) {
-        consumer.seek(pState.topicPartition, pState.nextOffset);
-      } else {
-        // nextOffset is unininitialized here, meaning start reading from latest record as of now
-        // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
-        // Remember the current position without waiting until the first record is read. This
-        // ensures checkpoint is accurate even if the reader is closed before reading any records.
-        Instant startReadTime = spec.getStartReadTime();
-        if (startReadTime != null) {
-          pState.nextOffset =
-              consumerSpEL.offsetForTime(consumer, pState.topicPartition, spec.getStartReadTime());
-          consumer.seek(pState.topicPartition, pState.nextOffset);
-        } else {
-          pState.nextOffset = consumer.position(pState.topicPartition);
-        }
-      }
-    }
-
     @Override
     public boolean start() throws IOException {
-      final int defaultPartitionInitTimeout = 60 * 1000;
-      final int kafkaRequestTimeoutMultiple = 2;
-
       Read<K, V> spec = source.spec;
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
       consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
@@ -1103,38 +1065,25 @@ public class KafkaIO {
       keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
       valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
 
-      // Seek to start offset for each partition. This is the first interaction with the server.
-      // Unfortunately it can block forever in case of network issues like incorrect ACLs.
-      // Initialize partition in a separate thread and cancel it if takes longer than a minute.
-      for (final PartitionState pState : partitionStates) {
-        Future<?> future =  consumerPollThread.submit(new Runnable() {
-          public void run() {
-            setupInitialOffset(pState);
+      for (PartitionState p : partitionStates) {
+        if (p.nextOffset != UNINITIALIZED_OFFSET) {
+          consumer.seek(p.topicPartition, p.nextOffset);
+        } else {
+          // nextOffset is unininitialized here, meaning start reading from latest record as of now
+          // ('latest' is the default, and is configurable) or 'look up offset by startReadTime.
+          // Remember the current position without waiting until the first record is read. This
+          // ensures checkpoint is accurate even if the reader is closed before reading any records.
+          Instant startReadTime = spec.getStartReadTime();
+          if (startReadTime != null) {
+            p.nextOffset =
+                consumerSpEL.offsetForTime(consumer, p.topicPartition, spec.getStartReadTime());
+            consumer.seek(p.topicPartition, p.nextOffset);
+          } else {
+            p.nextOffset = consumer.position(p.topicPartition);
           }
-        });
-
-        try {
-          // Timeout : 1 minute OR 2 * Kafka consumer request timeout if it is set.
-          Integer reqTimeout = (Integer) source.spec.getConsumerConfig().get(
-              ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-          future.get(reqTimeout != null ? kafkaRequestTimeoutMultiple * reqTimeout
-                         : defaultPartitionInitTimeout,
-                     TimeUnit.MILLISECONDS);
-        } catch (TimeoutException e) {
-          consumer.wakeup(); // This unblocks consumer stuck on network I/O.
-          // Likely reason : Kafka servers are configured to advertise internal ips, but
-          // those ips are not accessible from workers outside.
-          String msg = String.format(
-              "%s: Timeout while initializing partition '%s'. "
-                  + "Kafka client may not be able to connect to servers.",
-              this, pState.topicPartition);
-          LOG.error("{}", msg);
-          throw new IOException(msg);
-        } catch (Exception e) {
-          throw new IOException(e);
         }
-        LOG.info("{}: reading from {} starting at offset {}",
-                 name, pState.topicPartition, pState.nextOffset);
+
+        LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset);
       }
 
       // Start consumer read loop.
@@ -1205,11 +1154,14 @@ public class KafkaIO {
             continue;
           }
 
-          long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.
+          // sanity check
+          if (offset != expected) {
+            LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
+                this, pState.topicPartition, expected, offset - expected);
+          }
 
           if (curRecord == null) {
             LOG.info("{}: first record offset {}", name, offset);
-            offsetGap = 0;
           }
 
           curRecord = null; // user coders below might throw.
@@ -1230,7 +1182,7 @@ public class KafkaIO {
 
           int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
               + (rawRecord.value() == null ? 0 : rawRecord.value().length);
-          pState.recordConsumed(offset, recordSize, offsetGap);
+          pState.recordConsumed(offset, recordSize);
           bytesRead.inc(recordSize);
           bytesReadBySplit.inc(recordSize);
           return true;
@@ -1368,12 +1320,8 @@ public class KafkaIO {
       // might block to enqueue right after availableRecordsQueue.poll() below.
       while (!isShutdown) {
 
-        if (consumer != null) {
-          consumer.wakeup();
-        }
-        if (offsetConsumer != null) {
-          offsetConsumer.wakeup();
-        }
+        consumer.wakeup();
+        offsetConsumer.wakeup();
         availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
         try {
           isShutdown = consumerPollThread.awaitTermination(10, TimeUnit.SECONDS)


Mime
View raw message