beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [beam] branch master updated: [BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat (#4332)
Date Thu, 18 Jan 2018 18:50:23 GMT
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a96c1c  [BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat
(#4332)
2a96c1c is described below

commit 2a96c1cfeee0fd375ebdba815ab2cc2657c014ec
Author: Ɓukasz Gajowy <lukasz.gajowy@gmail.com>
AuthorDate: Thu Jan 18 19:50:19 2018 +0100

    [BEAM-3217] add HadoopInputFormatIO integration test using DBInputFormat (#4332)
    
    The kubernetes infrastructure that is needed for the jenkins job
    to run is not available for now. We should add it once
    the infrastructure is there.
---
 sdks/java/io/common/pom.xml                        |   5 +
 .../beam/sdk/io/common/DatabaseTestHelper.java}    |  57 +++---
 .../org/apache/beam/sdk/io/common/TestRow.java     |   1 +
 sdks/java/io/hadoop/input-format/pom.xml           | 212 +++++++++++++++++++++
 .../hadoop/inputformat/HadoopInputFormatIOIT.java  | 147 ++++++++++++++
 .../io/hadoop/inputformat/TestRowDBWritable.java   |  83 ++++++++
 sdks/java/io/jdbc/pom.xml                          |   1 -
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java |  27 +--
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    |  13 +-
 .../apache/beam/sdk/io/jdbc/JdbcTestHelper.java    |  35 +---
 sdks/java/io/pom.xml                               |  11 ++
 11 files changed, 500 insertions(+), 92 deletions(-)

diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index eb79091..bd143d4 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -48,5 +48,10 @@
         <artifactId>junit</artifactId>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>org.postgresql</groupId>
+        <artifactId>postgresql</artifactId>
+        <scope>test</scope>
+      </dependency>
     </dependencies>
 </project>
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
similarity index 59%
copy from sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
copy to sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
index fedae51..d69654a 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java
@@ -15,32 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.io.jdbc;
+package org.apache.beam.sdk.io.common;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import javax.sql.DataSource;
-import org.apache.beam.sdk.io.common.TestRow;
+import org.postgresql.ds.PGSimpleDataSource;
 
 /**
- * Contains Test helper methods used by both Integration and Unit Tests in
- * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
+ * This class contains helper methods to ease database usage in tests.
  */
-class JdbcTestHelper {
-  static String getTableName(String testIdentifier) throws ParseException {
-    SimpleDateFormat formatter = new SimpleDateFormat();
-    formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
-    return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
+public class DatabaseTestHelper {
+
+  public static PGSimpleDataSource getPostgresDataSource(IOTestPipelineOptions options) {
+    PGSimpleDataSource dataSource = new PGSimpleDataSource();
+    dataSource.setDatabaseName(options.getPostgresDatabaseName());
+    dataSource.setServerName(options.getPostgresServerName());
+    dataSource.setPortNumber(options.getPostgresPort());
+    dataSource.setUser(options.getPostgresUsername());
+    dataSource.setPassword(options.getPostgresPassword());
+    dataSource.setSsl(options.getPostgresSsl());
+    return dataSource;
   }
 
-  static void createDataTable(
-      DataSource dataSource, String tableName)
+  public static void createTable(DataSource dataSource, String tableName)
       throws SQLException {
     try (Connection connection = dataSource.getConnection()) {
       try (Statement statement = connection.createStatement()) {
@@ -50,7 +51,7 @@ class JdbcTestHelper {
     }
   }
 
-  static void cleanUpDataTable(DataSource dataSource, String tableName)
+  public static void deleteTable(DataSource dataSource, String tableName)
       throws SQLException {
     if (tableName != null) {
       try (Connection connection = dataSource.getConnection();
@@ -60,22 +61,18 @@ class JdbcTestHelper {
     }
   }
 
-  static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
-    @Override
-    public TestRow mapRow(ResultSet resultSet) throws Exception {
-      return TestRow.create(
-          resultSet.getInt("id"), resultSet.getString("name"));
-    }
+  public static String getTestTableName(String testIdentifier) {
+    SimpleDateFormat formatter = new SimpleDateFormat();
+    formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
+    return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
   }
 
-  static class PrepareStatementFromTestRow
-      implements JdbcIO.PreparedStatementSetter<TestRow> {
-    @Override
-    public void setParameters(TestRow element, PreparedStatement statement)
-        throws SQLException {
-      statement.setLong(1, element.id());
-      statement.setString(2, element.name());
-    }
+  public static String getPostgresDBUrl(IOTestPipelineOptions options) {
+    return String.format(
+        "jdbc:postgresql://%s:%s/%s",
+        options.getPostgresServerName(),
+        options.getPostgresPort(),
+        options.getPostgresDatabaseName()
+    );
   }
-
 }
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index 79a144d..a276869 100644
--- a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -97,6 +97,7 @@ public abstract class TestRow implements Serializable, Comparable<TestRow>
{
   private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
       1000, "7d94d63a41164be058a9680002914358",
       100_000, "c7cbddb319209e200f1c5eebef8fe960",
+      600_000, "e2add2f680de9024e9bc46cd3912545e",
       5_000_000, "c44f8a5648cd9207c9c6f77395a998dc"
   );
 
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index c698b40..931e11e 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -27,6 +27,197 @@
   <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format</name>
   <description>IO to read data from data sources which implement Hadoop Input Format.</description>
 
+  <profiles>
+    <!-- Include the Google Cloud Dataflow runner activated by -DintegrationTestRunner=dataflow
-->
+    <profile>
+      <id>dataflow-runner</id>
+      <activation>
+        <property>
+          <name>integrationTestRunner</name>
+          <value>dataflow</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!--
+        This profile invokes PerfKitBenchmarker, which does benchmarking of
+        the IO ITs. The arguments passed to it allow it to invoke mvn again
+        with the desired benchmark.
+
+        To invoke this, run:
+
+        mvn verify -Dio-it-suite -pl sdks/java/io/hadoop/input-format
+            -DpkbLocation="path-to-pkb.py" \
+            -DintegrationTestPipelineOptions='["-tempRoot=gs://bucket/staging"]'
+    -->
+    <profile>
+      <id>io-it-suite</id>
+      <activation>
+        <property><name>io-it-suite</name></property>
+      </activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the root
+             See discussion in BEAM-2460 -->
+        <beamRootProjectDir>${project.parent.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                <argument>-beam_it_module=sdks/java/io/hadoop/input-format</argument>
+                <argument>-beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT</argument>
+                <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config.yml</argument>
+                <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml</argument>
+                <!-- arguments typically defined by user -->
+                <argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>${surefire-plugin.version}</version>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!--
+      io-it-suite-local overrides part of io-it-suite, allowing users to run tests
+      when they are on a separate network from the kubernetes cluster by
+      creating a LoadBalancer service.
+    -->
+    <profile>
+      <id>io-it-suite-local</id>
+      <activation><property><name>io-it-suite-local</name></property></activation>
+      <properties>
+        <!-- This is based on the location of the current pom relative to the root
+             See discussion in BEAM-2460 -->
+        <beamRootProjectDir>${project.parent.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.gmaven</groupId>
+            <artifactId>groovy-maven-plugin</artifactId>
+            <version>${groovy-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>find-supported-python-for-compile</id>
+                <phase>initialize</phase>
+                <goals>
+                  <goal>execute</goal>
+                </goals>
+                <configuration>
+                  <source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${maven-exec-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>verify</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+              </execution>
+            </executions>
+            <configuration>
+              <executable>${python.interpreter.bin}</executable>
+              <arguments>
+                <argument>${pkbLocation}</argument>
+                <argument>-benchmarks=beam_integration_benchmark</argument>
+                <argument>-beam_it_profile=io-it</argument>
+                <argument>-beam_location=${beamRootProjectDir}</argument>
+                <argument>-beam_prebuilt=true</argument>
+                <argument>-beam_sdk=java</argument>
+                <argument>-kubeconfig=${kubeconfig}</argument>
+                <argument>-kubectl=${kubectl}</argument>
+                <!-- runner overrides, controlled via forceDirectRunner -->
+                <argument>${pkbBeamRunnerProfile}</argument>
+                <argument>${pkbBeamRunnerOption}</argument>
+                <!-- specific to this IO -->
+                <argument>-beam_it_module=sdks/java/io/hadoop/input-format</argument>
+                <argument>-beam_it_class=org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT</argument>
+                <argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/postgres/pkb-config-local.yml</argument>
+                <argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres.yml,${beamRootProjectDir}/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml</argument>
+                <!-- arguments typically defined by user -->
+                <argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+              </arguments>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <version>${surefire-plugin.version}</version>
+            <configuration>
+              <skipTests>true</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>com.google.guava</groupId>
@@ -70,9 +261,30 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-jdbc</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
new file mode 100644
index 0000000..397aa3a
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import static org.apache.beam.sdk.io.common.TestRow.DeterministicallyConstructTestRowFn;
+import static org.apache.beam.sdk.io.common.TestRow.SelectNameFn;
+import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.apache.beam.sdk.io.hadoop.inputformat.TestRowDBWritable.PrepareStatementFromTestRow;
+
+import java.sql.SQLException;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.postgresql.ds.PGSimpleDataSource;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO}
+ * on an independent postgres instance.
+ *
+ * <p>This test requires a running instance of Postgres. Pass in connection information
using
+ * PipelineOptions:
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/hadoop/input-format/ -DintegrationTestPipelineOptions='[
+ *  "--postgresServerName=1.2.3.4",
+ *  "--postgresUsername=postgres",
+ *  "--postgresDatabaseName=myfancydb",
+ *  "--postgresPassword=mypass",
+ *  "--postgresSsl=false",
+ *  "--numberOfRecords=1000" ]'
+ * </pre>
+ */
+public class HadoopInputFormatIOIT {
+
+  private static PGSimpleDataSource dataSource;
+  private static Integer numberOfRows;
+  private static String tableName;
+  private static SerializableConfiguration hadoopConfiguration;
+
+  @Rule
+  public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule
+  public TestPipeline readPipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void setUp() throws SQLException {
+    PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+    IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+        .as(IOTestPipelineOptions.class);
+
+    dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+    numberOfRows = options.getNumberOfRecords();
+    tableName = DatabaseTestHelper.getTestTableName("HadoopInputFormatIOIT");
+
+    DatabaseTestHelper.createTable(dataSource, tableName);
+    setupHadoopConfiguration(options);
+  }
+
+  private static void setupHadoopConfiguration(IOTestPipelineOptions options) {
+    Configuration conf = new Configuration();
+    DBConfiguration.configureDB(
+        conf,
+        "org.postgresql.Driver",
+        DatabaseTestHelper.getPostgresDBUrl(options),
+        options.getPostgresUsername(),
+        options.getPostgresPassword()
+    );
+    conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
+    conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
+    conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class);
+
+    conf.setClass("key.class", LongWritable.class, Object.class);
+    conf.setClass("value.class", TestRowDBWritable.class, Object.class);
+    conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
+
+    hadoopConfiguration = new SerializableConfiguration(conf);
+  }
+
+  @AfterClass
+  public static void tearDown() throws SQLException {
+    DatabaseTestHelper.deleteTable(dataSource, tableName);
+  }
+
+  @Test
+  public void readUsingHadoopInputFormat() {
+    writePipeline.apply("Generate sequence", GenerateSequence.from(0).to(numberOfRows))
+        .apply("Produce db rows", ParDo.of(new DeterministicallyConstructTestRowFn()))
+        .apply("Prevent fusion before writing", Reshuffle.<TestRow>viaRandomKey())
+        .apply("Write using JDBCIO", JdbcIO.<TestRow>write()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+            .withStatement(String.format("insert into %s values(?, ?)", tableName))
+            .withPreparedStatementSetter(new PrepareStatementFromTestRow()));
+
+    writePipeline.run().waitUntilFinish();
+
+    PCollection<String> consolidatedHashcode = readPipeline
+        .apply("Read using HadoopInputFormat", HadoopInputFormatIO
+            .<LongWritable, TestRowDBWritable>read()
+            .withConfiguration(hadoopConfiguration.get()))
+        .apply("Get values only", Values.<TestRowDBWritable>create())
+        .apply("Values as string", ParDo.of(new SelectNameFn()))
+        .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+    PAssert.thatSingleton(consolidatedHashcode)
+        .isEqualTo(getExpectedHashForRowCount(numberOfRows));
+
+    readPipeline.run().waitUntilFinish();
+  }
+}
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
new file mode 100644
index 0000000..c9b2639
--- /dev/null
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/TestRowDBWritable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hadoop.inputformat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.io.common.TestRow;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * A subclass of {@link org.apache.beam.sdk.io.common.TestRow} to be used with
+ * {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat}.
+ */
+@DefaultCoder(AvroCoder.class)
+public class TestRowDBWritable extends TestRow implements DBWritable, Writable {
+
+  private Integer id;
+  private String name;
+
+  @Override public Integer id() {
+    return id;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public void write(PreparedStatement statement) throws SQLException {
+    statement.setInt(1, id);
+    statement.setString(2, name);
+  }
+
+  @Override
+  public void readFields(ResultSet resultSet) throws SQLException {
+    id = resultSet.getInt(1);
+    name = resultSet.getString(2);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+    out.writeChars(name);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+   id = in.readInt();
+   name = in.readUTF();
+  }
+
+  static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow>
{
+    @Override
+    public void setParameters(TestRow element, PreparedStatement statement)
+        throws SQLException {
+      statement.setLong(1, element.id());
+      statement.setString(2, element.name());
+    }
+  }
+}
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 73fbc52..6264e9e 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -328,7 +328,6 @@
     <dependency>
       <groupId>org.postgresql</groupId>
       <artifactId>postgresql</artifactId>
-      <version>9.4.1212.jre7</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index 941a775..ed169c7 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -18,11 +18,10 @@
 package org.apache.beam.sdk.io.jdbc;
 
 import java.sql.SQLException;
-import java.text.ParseException;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
 import org.apache.beam.sdk.io.common.TestRow;
@@ -74,34 +73,20 @@ public class JdbcIOIT {
   public TestPipeline pipelineRead = TestPipeline.create();
 
   @BeforeClass
-  public static void setup() throws SQLException, ParseException {
+  public static void setup() throws SQLException {
     PipelineOptionsFactory.register(IOTestPipelineOptions.class);
     IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
         .as(IOTestPipelineOptions.class);
 
     numberOfRows = options.getNumberOfRecords();
-    dataSource = getDataSource(options);
-
-    tableName = JdbcTestHelper.getTableName("IT");
-    JdbcTestHelper.createDataTable(dataSource, tableName);
-  }
-
-  private static PGSimpleDataSource getDataSource(IOTestPipelineOptions options) {
-    PGSimpleDataSource dataSource = new PGSimpleDataSource();
-
-    dataSource.setDatabaseName(options.getPostgresDatabaseName());
-    dataSource.setServerName(options.getPostgresServerName());
-    dataSource.setPortNumber(options.getPostgresPort());
-    dataSource.setUser(options.getPostgresUsername());
-    dataSource.setPassword(options.getPostgresPassword());
-    dataSource.setSsl(options.getPostgresSsl());
-
-    return dataSource;
+    dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+    tableName = DatabaseTestHelper.getTestTableName("IT");
+    DatabaseTestHelper.createTable(dataSource, tableName);
   }
 
   @AfterClass
   public static void tearDown() throws SQLException {
-    JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+    DatabaseTestHelper.deleteTable(dataSource, tableName);
   }
 
   /**
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index f35c8b1..4871f20 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.TestRow;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -111,16 +112,16 @@ public class JdbcIOTest implements Serializable {
     dataSource.setServerName("localhost");
     dataSource.setPortNumber(port);
 
-    readTableName = JdbcTestHelper.getTableName("UT_READ");
+    readTableName = DatabaseTestHelper.getTestTableName("UT_READ");
 
-    JdbcTestHelper.createDataTable(dataSource, readTableName);
+    DatabaseTestHelper.createTable(dataSource, readTableName);
     addInitialData(dataSource, readTableName);
   }
 
   @AfterClass
   public static void shutDownDatabase() throws Exception {
     try {
-      JdbcTestHelper.cleanUpDataTable(dataSource, readTableName);
+      DatabaseTestHelper.deleteTable(dataSource, readTableName);
     } finally {
       if (derbyServer != null) {
         derbyServer.shutdown();
@@ -253,8 +254,8 @@ public class JdbcIOTest implements Serializable {
   public void testWrite() throws Exception {
     final long rowsToAdd = 1000L;
 
-    String tableName = JdbcTestHelper.getTableName("UT_WRITE");
-    JdbcTestHelper.createDataTable(dataSource, tableName);
+    String tableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+    DatabaseTestHelper.createTable(dataSource, tableName);
     try {
       ArrayList<KV<Integer, String>> data = new ArrayList<>();
       for (int i = 0; i < rowsToAdd; i++) {
@@ -290,7 +291,7 @@ public class JdbcIOTest implements Serializable {
         }
       }
     } finally {
-      JdbcTestHelper.cleanUpDataTable(dataSource, tableName);
+      DatabaseTestHelper.deleteTable(dataSource, tableName);
     }
   }
 
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
index fedae51..1824989 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestHelper.java
@@ -17,15 +17,9 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import javax.sql.DataSource;
 import org.apache.beam.sdk.io.common.TestRow;
 
 /**
@@ -33,32 +27,6 @@ import org.apache.beam.sdk.io.common.TestRow;
  * {@link org.apache.beam.sdk.io.jdbc.JdbcIO}.
  */
 class JdbcTestHelper {
-  static String getTableName(String testIdentifier) throws ParseException {
-    SimpleDateFormat formatter = new SimpleDateFormat();
-    formatter.applyPattern("yyyy_MM_dd_HH_mm_ss_S");
-    return String.format("BEAMTEST_%s_%s", testIdentifier, formatter.format(new Date()));
-  }
-
-  static void createDataTable(
-      DataSource dataSource, String tableName)
-      throws SQLException {
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        statement.execute(
-            String.format("create table %s (id INT, name VARCHAR(500))", tableName));
-      }
-    }
-  }
-
-  static void cleanUpDataTable(DataSource dataSource, String tableName)
-      throws SQLException {
-    if (tableName != null) {
-      try (Connection connection = dataSource.getConnection();
-          Statement statement = connection.createStatement()) {
-        statement.executeUpdate(String.format("drop table %s", tableName));
-      }
-    }
-  }
 
   static class CreateTestRowOfNameAndId implements JdbcIO.RowMapper<TestRow> {
     @Override
@@ -68,8 +36,7 @@ class JdbcTestHelper {
     }
   }
 
-  static class PrepareStatementFromTestRow
-      implements JdbcIO.PreparedStatementSetter<TestRow> {
+  static class PrepareStatementFromTestRow implements JdbcIO.PreparedStatementSetter<TestRow>
{
     @Override
     public void setParameters(TestRow element, PreparedStatement statement)
         throws SQLException {
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 0710df0..d643775 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -127,4 +127,15 @@
       </properties>
     </profile>
   </profiles>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.postgresql</groupId>
+        <artifactId>postgresql</artifactId>
+        <version>9.4.1212.jre7</version>
+        <scope>test</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
 </project>

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Mime
View raw message