beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: Add JDBC postgres IT, load script and k8 script
Date Tue, 14 Feb 2017 02:12:38 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8e0573ba5 -> 4018c835c


Add JDBC postgres IT, load script and k8 script


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b284fb4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b284fb4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b284fb4d

Branch: refs/heads/master
Commit: b284fb4dedde188e302be01dd9426d09d7ef0021
Parents: 8e0573b
Author: Stephen Sisk <sisk@google.com>
Authored: Tue Jan 24 17:56:35 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Feb 13 18:12:01 2017 -0800

----------------------------------------------------------------------
 sdks/java/io/jdbc/pom.xml                       |  11 ++
 .../org/apache/beam/sdk/io/jdbc/JdbcIOIT.java   | 175 +++++++++++++++++++
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 107 +++++-------
 .../beam/sdk/io/jdbc/JdbcTestDataSet.java       | 127 ++++++++++++++
 .../beam/sdk/io/jdbc/PostgresTestOptions.java   |  60 +++++++
 .../kubernetes/postgres-pod-no-vol.yml          |  32 ++++
 .../kubernetes/postgres-service-public.yml      |  27 +++
 .../kubernetes/setup-postgres-service.sh        |  20 +++
 8 files changed, 494 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/pom.xml b/sdks/java/io/jdbc/pom.xml
index 92a3761..23feab6 100644
--- a/sdks/java/io/jdbc/pom.xml
+++ b/sdks/java/io/jdbc/pom.xml
@@ -74,6 +74,11 @@
       <version>2.1.1</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <!-- compile dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>
@@ -120,6 +125,12 @@
       <artifactId>slf4j-jdk14</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>9.4.1212.jre7</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
new file mode 100644
index 0000000..15206c7
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.postgresql.ds.PGSimpleDataSource;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent Postgres instance.
+ *
+ * <p>This test requires a running instance of Postgres, and the test dataset must
exist in the
+ * database. `JdbcTestDataSet` will create the read table.
+ *
+ * <p>You can run just this test by doing the following:
+ * <pre>
+ * mvn test-compile compile failsafe:integration-test -D beamTestPipelineOptions='[
+ * "--postgresServerName=1.2.3.4",
+ * "--postgresUsername=postgres",
+ * "--postgresDatabaseName=myfancydb",
+ * "--postgresPassword=yourpassword",
+ * "--postgresSsl=false"
+ * ]' -DskipITs=false -Dit.test=org.apache.beam.sdk.io.jdbc.JdbcIOIT -DfailIfNoTests=false
+ * </pre>
+ */
+@RunWith(JUnit4.class)
+public class JdbcIOIT {
+  private static PGSimpleDataSource dataSource;
+  private static String writeTableName;
+
+  @BeforeClass
+  public static void setup() throws SQLException {
+    PipelineOptionsFactory.register(PostgresTestOptions.class);
+    PostgresTestOptions options = TestPipeline.testingPipelineOptions()
+        .as(PostgresTestOptions.class);
+
+    // We do dataSource set up in BeforeClass rather than Before since we don't need to create
a new
+    // dataSource for each test.
+    dataSource = JdbcTestDataSet.getDataSource(options);
+  }
+
+  @AfterClass
+  public static void tearDown() throws SQLException {
+    // Only do write table clean up once for the class since we don't want to clean up after
both
+    // read and write tests, only want to do it once after all the tests are done.
+    JdbcTestDataSet.cleanUpDataTable(dataSource, writeTableName);
+  }
+
+  private static class CreateKVOfNameAndId implements JdbcIO.RowMapper<KV<String, Integer>>
{
+    @Override
+    public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
+      KV<String, Integer> kv =
+          KV.of(resultSet.getString("name"), resultSet.getInt("id"));
+      return kv;
+    }
+  }
+
+  private static class PutKeyInColumnOnePutValueInColumnTwo
+      implements JdbcIO.PreparedStatementSetter<KV<Integer, String>> {
+    @Override
+    public void setParameters(KV<Integer, String> element, PreparedStatement statement)
+                    throws SQLException {
+      statement.setInt(1, element.getKey());
+      statement.setString(2, element.getValue());
+    }
+  }
+
+  /**
+   * Does a test read of a few rows from a postgres database.
+   *
+   * <p>Note that IT read tests must not do any data table manipulation (setup/clean
up.)
+   * @throws SQLException
+   */
+  @Test
+  public void testRead() throws SQLException {
+    String tableName = JdbcTestDataSet.READ_TABLE_NAME;
+
+    TestPipeline pipeline = TestPipeline.create();
+
+    PCollection<KV<String, Integer>> output = pipeline.apply(JdbcIO.<KV<String,
Integer>>read()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+            .withQuery("select name,id from " + tableName)
+            .withRowMapper(new CreateKVOfNameAndId())
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
+
+    // TODO: validate actual contents of rows, not just count.
+    PAssert.thatSingleton(
+        output.apply("Count All", Count.<KV<String, Integer>>globally()))
+        .isEqualTo(1000L);
+
+    List<KV<String, Long>> expectedCounts = new ArrayList<>();
+    for (String scientist : JdbcTestDataSet.SCIENTISTS) {
+      expectedCounts.add(KV.of(scientist, 100L));
+    }
+    PAssert.that(output.apply("Count Scientist", Count.<String, Integer>perKey()))
+        .containsInAnyOrder(expectedCounts);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Tests writes to a postgres database.
+   *
+   * <p>Write Tests must clean up their data - in this case, it uses a new table every
test run so
+   * that it won't interfere with read tests/other write tests. It uses finally to attempt
to
+   * clean up data at the end of the test run.
+   * @throws SQLException
+   */
+  @Test
+  public void testWrite() throws SQLException {
+    writeTableName = JdbcTestDataSet.createWriteDataTable(dataSource);
+
+    TestPipeline pipeline = TestPipeline.create();
+
+    ArrayList<KV<Integer, String>> data = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      KV<Integer, String> kv = KV.of(i, "Test");
+      data.add(kv);
+    }
+    pipeline.apply(Create.of(data))
+        .apply(JdbcIO.<KV<Integer, String>>write()
+            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
+            .withStatement(String.format("insert into %s values(?, ?)", writeTableName))
+            .withPreparedStatementSetter(new PutKeyInColumnOnePutValueInColumnTwo()));
+
+    pipeline.run().waitUntilFinish();
+
+    try (Connection connection = dataSource.getConnection();
+         Statement statement = connection.createStatement();
+         ResultSet resultSet = statement.executeQuery("select count(*) from " + writeTableName))
{
+      resultSet.next();
+      int count = resultSet.getInt(1);
+      Assert.assertEquals(2000, count);
+    }
+    // TODO: Actually verify contents of the rows.
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 3dd4df4..32573ea 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -46,7 +46,6 @@ import org.apache.derby.drda.NetworkServerControl;
 import org.apache.derby.jdbc.ClientDataSource;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -109,19 +108,14 @@ public class JdbcIOTest implements Serializable {
     dataSource.setServerName("localhost");
     dataSource.setPortNumber(port);
 
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        statement.executeUpdate("create table BEAM(id INT, name VARCHAR(500))");
-      }
-    }
+
+    JdbcTestDataSet.createReadDataTable(dataSource);
   }
 
   @AfterClass
   public static void shutDownDatabase() throws Exception {
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        statement.executeUpdate("drop table BEAM");
-      }
+    try {
+      JdbcTestDataSet.cleanUpDataTable(dataSource, JdbcTestDataSet.READ_TABLE_NAME);
     } finally {
       if (derbyServer != null) {
         derbyServer.shutdown();
@@ -129,31 +123,6 @@ public class JdbcIOTest implements Serializable {
     }
   }
 
-  @Before
-  public void initTable() throws Exception {
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        statement.executeUpdate("delete from BEAM");
-      }
-
-      String[] scientists = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday",
-          "Newton", "Bohr", "Galilei", "Maxwell"};
-      connection.setAutoCommit(false);
-      try (PreparedStatement preparedStatement =
-               connection.prepareStatement("insert into BEAM " + "values (?,?)")) {
-        for (int i = 0; i < 1000; i++) {
-          int index = i % scientists.length;
-          preparedStatement.clearParameters();
-          preparedStatement.setInt(1, i);
-          preparedStatement.setString(2, scientists[index]);
-          preparedStatement.executeUpdate();
-        }
-      }
-
-      connection.commit();
-    }
-  }
-
   @Test
   public void testDataSourceConfigurationDataSource() throws Exception {
     JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create(dataSource);
@@ -215,7 +184,7 @@ public class JdbcIOTest implements Serializable {
     PCollection<KV<String, Integer>> output = pipeline.apply(
         JdbcIO.<KV<String, Integer>>read()
             .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-            .withQuery("select name,id from BEAM")
+            .withQuery("select name,id from " + JdbcTestDataSet.READ_TABLE_NAME)
             .withRowMapper(new JdbcIO.RowMapper<KV<String, Integer>>() {
               @Override
               public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception
{
@@ -252,7 +221,8 @@ public class JdbcIOTest implements Serializable {
      PCollection<KV<String, Integer>> output = pipeline.apply(
              JdbcIO.<KV<String, Integer>>read()
                      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource))
-                     .withQuery("select name,id from BEAM where name = ?")
+                     .withQuery(String.format("select name,id from %s where name = ?",
+                         JdbcTestDataSet.READ_TABLE_NAME))
                      .withStatementPrepator(new JdbcIO.StatementPreparator() {
                        @Override
                        public void setParameters(PreparedStatement preparedStatement)
@@ -281,36 +251,43 @@ public class JdbcIOTest implements Serializable {
   @Category(NeedsRunner.class)
   public void testWrite() throws Exception {
 
-    ArrayList<KV<Integer, String>> data = new ArrayList<>();
-    for (int i = 0; i < 1000; i++) {
-      KV<Integer, String> kv = KV.of(i, "Test");
-      data.add(kv);
-    }
-    pipeline.apply(Create.of(data))
-        .apply(JdbcIO.<KV<Integer, String>>write()
-            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
-                "org.apache.derby.jdbc.ClientDriver",
-                "jdbc:derby://localhost:" + port + "/target/beam"))
-            .withStatement("insert into BEAM values(?, ?)")
-            .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer,
String>>() {
-              public void setParameters(KV<Integer, String> element, PreparedStatement
statement)
-                  throws Exception {
-                statement.setInt(1, element.getKey());
-                statement.setString(2, element.getValue());
-              }
-            }));
-
-    pipeline.run();
-
-    try (Connection connection = dataSource.getConnection()) {
-      try (Statement statement = connection.createStatement()) {
-        try (ResultSet resultSet = statement.executeQuery("select count(*) from BEAM")) {
-          resultSet.next();
-          int count = resultSet.getInt(1);
-
-          Assert.assertEquals(2000, count);
+    String tableName = JdbcTestDataSet.createWriteDataTable(dataSource);
+    try {
+      ArrayList<KV<Integer, String>> data = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+        KV<Integer, String> kv = KV.of(i, "Test");
+        data.add(kv);
+      }
+      pipeline.apply(Create.of(data))
+          .apply(JdbcIO.<KV<Integer, String>>write()
+              .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
+                  "org.apache.derby.jdbc.ClientDriver",
+                  "jdbc:derby://localhost:" + port + "/target/beam"))
+              .withStatement(String.format("insert into %s values(?, ?)", tableName))
+              .withPreparedStatementSetter(
+                  new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
+                public void setParameters(
+                    KV<Integer, String> element, PreparedStatement statement) throws
Exception {
+                  statement.setInt(1, element.getKey());
+                  statement.setString(2, element.getValue());
+                }
+              }));
+
+      pipeline.run();
+
+      try (Connection connection = dataSource.getConnection()) {
+        try (Statement statement = connection.createStatement()) {
+          try (ResultSet resultSet = statement.executeQuery("select count(*) from "
+                + tableName)) {
+            resultSet.next();
+            int count = resultSet.getInt(1);
+
+            Assert.assertEquals(2000, count);
+          }
         }
       }
+    } finally {
+      JdbcTestDataSet.cleanUpDataTable(dataSource, tableName);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
new file mode 100644
index 0000000..11cc2be
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcTestDataSet.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.postgresql.ds.PGSimpleDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manipulates test data used by the {@link org.apache.beam.sdk.io.jdbc.JdbcIO} tests.
+ *
+ * <p>This is independent from the tests so that for read tests it can be run separately
after data
+ * store creation rather than every time (which can be more fragile.)
+ */
+public class JdbcTestDataSet {
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcTestDataSet.class);
+  public static final String[] SCIENTISTS = {"Einstein", "Darwin", "Copernicus", "Pasteur",
"Curie",
+      "Faraday", "McClintock", "Herschel", "Hopper", "Lovelace"};
+  /**
+   * Use this to create the read tables before IT read tests.
+   *
+   * <p>To invoke this class, you can use this command line:
+   * mvn test-compile exec:java -Dexec.mainClass=org.apache.beam.sdk.io.jdbc.JdbcTestDataSet
\
+   *   -Dexec.args="--postgresIp=1.1.1.1 --postgresUsername=postgres
+   *   --postgresDatabaseName=myfancydb \
+   *   --postgresPassword=yourpassword --postgresSsl=false" \
+   *   -Dexec.classpathScope=test
+   * @param args Please pass options from PostgresTestOptions used for connection to postgres
as
+   * shown above.
+   */
+  public static void main(String[] args) throws SQLException {
+    PipelineOptionsFactory.register(PostgresTestOptions.class);
+    PostgresTestOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(PostgresTestOptions.class);
+
+    createReadDataTable(getDataSource(options));
+  }
+
+  public static PGSimpleDataSource getDataSource(PostgresTestOptions options) throws SQLException
{
+    PGSimpleDataSource dataSource = new PGSimpleDataSource();
+
+    // Tests must receive parameters for connections from PipelineOptions
+    // Parameters should be generic to all tests that use a particular datasource, not
+    // the particular test.
+    dataSource.setDatabaseName(options.getPostgresDatabaseName());
+    dataSource.setServerName(options.getPostgresServerName());
+    dataSource.setPortNumber(options.getPostgresPort());
+    dataSource.setUser(options.getPostgresUsername());
+    dataSource.setPassword(options.getPostgresPassword());
+    dataSource.setSsl(options.getPostgresSsl());
+
+    return dataSource;
+  }
+
+  public static final String READ_TABLE_NAME = "BEAM_TEST_READ";
+
+  public static void createReadDataTable(DataSource dataSource) throws SQLException {
+    createDataTable(dataSource, READ_TABLE_NAME);
+  }
+
+  public static String createWriteDataTable(DataSource dataSource) throws SQLException {
+    String tableName = "BEAMTEST" + org.joda.time.Instant.now().getMillis();
+    createDataTable(dataSource, tableName);
+    return tableName;
+  }
+
+  private static void createDataTable(DataSource dataSource, String tableName) throws SQLException
{
+    try (Connection connection = dataSource.getConnection()) {
+      // something like this will need to happen in tests on a newly created postgres server,
+      // but likely it will happen in perfkit, not here
+      // alternatively, we may have a pipelineoption indicating whether we want to
+      // re-use the database or create a new one
+      try (Statement statement = connection.createStatement()) {
+        statement.execute(
+            String.format("create table %s (id INT, name VARCHAR(500))", tableName));
+      }
+
+      connection.setAutoCommit(false);
+      try (PreparedStatement preparedStatement =
+               connection.prepareStatement(
+                   String.format("insert into %s values (?,?)", tableName))) {
+        for (int i = 0; i < 1000; i++) {
+          int index = i % SCIENTISTS.length;
+          preparedStatement.clearParameters();
+          preparedStatement.setInt(1, i);
+          preparedStatement.setString(2, SCIENTISTS[index]);
+          preparedStatement.executeUpdate();
+        }
+      }
+      connection.commit();
+    }
+
+    LOG.info("Created table {}", tableName);
+  }
+
+  public static void cleanUpDataTable(DataSource dataSource, String tableName)
+      throws SQLException {
+    if (tableName != null) {
+      try (Connection connection = dataSource.getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.executeUpdate(String.format("drop table %s", tableName));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/PostgresTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/PostgresTestOptions.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/PostgresTestOptions.java
new file mode 100644
index 0000000..5612d19
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/PostgresTestOptions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * These options can be used by a test connecting to a postgres database to configure their
+ * connection.
+ */
+public interface PostgresTestOptions extends TestPipelineOptions {
+    @Description("Server name for postgres server (host name/ip address)")
+    @Default.String("postgres-server-name")
+    String getPostgresServerName();
+    void setPostgresServerName(String value);
+
+    @Description("Username for postgres server")
+    @Default.String("postgres-username")
+    String getPostgresUsername();
+    void setPostgresUsername(String value);
+
+    // Note that passwords are not as secure an authentication as other methods, and used
here for
+    // a test environment only.
+    @Description("Password for postgres server")
+    @Default.String("postgres-password")
+    String getPostgresPassword();
+    void setPostgresPassword(String value);
+
+    @Description("Database name for postgres server")
+    @Default.String("postgres-database-name")
+    String getPostgresDatabaseName();
+    void setPostgresDatabaseName(String value);
+
+    @Description("Port for postgres server")
+    @Default.Integer(0)
+    Integer getPostgresPort();
+    void setPostgresPort(Integer value);
+
+    @Description("Whether the postgres server uses SSL")
+    @Default.Boolean(true)
+    Boolean getPostgresSsl();
+    void setPostgresSsl(Boolean value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-pod-no-vol.yml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-pod-no-vol.yml b/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-pod-no-vol.yml
new file mode 100644
index 0000000..1e0c12a
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-pod-no-vol.yml
@@ -0,0 +1,32 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Pod
+metadata:
+  name: postgres-no-pv
+  labels:
+    name: postgres-no-pv
+spec:
+  containers:
+    - name: postgres
+      image: postgres
+      env:
+        - name: POSTGRES_PASS
+          value: sroim3
+        - name: PGDATA
+          value: /var/lib/postgresql/data/pgdata
+      ports:
+        - containerPort: 5432

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-service-public.yml
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-service-public.yml b/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-service-public.yml
new file mode 100644
index 0000000..4da79f8
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/resources/kubernetes/postgres-service-public.yml
@@ -0,0 +1,27 @@
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: postgres-no-pv
+  labels:
+    name: postgres-no-pv
+spec:
+  ports:
+    - port: 5432
+  selector:
+    name: postgres-no-pv
+  type: LoadBalancer

http://git-wip-us.apache.org/repos/asf/beam/blob/b284fb4d/sdks/java/io/jdbc/src/test/resources/kubernetes/setup-postgres-service.sh
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/test/resources/kubernetes/setup-postgres-service.sh b/sdks/java/io/jdbc/src/test/resources/kubernetes/setup-postgres-service.sh
new file mode 100644
index 0000000..eece773
--- /dev/null
+++ b/sdks/java/io/jdbc/src/test/resources/kubernetes/setup-postgres-service.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+kubectl.sh create -f postgres-pod-no-vol.yml
+kubectl.sh create -f postgres-service-public.yml


Mime
View raw message