gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/4] incubator-gobblin git commit: [GOBBLIN-203] Add Postgresql source and extractor
Date Fri, 08 Sep 2017 00:01:05 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8a374f207 -> 05bf034e3


[GOBBLIN-203] Add Postgresql source and extractor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e12fa76d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e12fa76d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e12fa76d

Branch: refs/heads/master
Commit: e12fa76dc6e4948c1d8ecdc8bf6d4ceabdbfc222
Parents: ce60d2c
Author: tilakpatidar <tilakpatidar@gmail.com>
Authored: Thu Aug 10 11:53:58 2017 +0530
Committer: tilakpatidar <tilakpatidar@gmail.com>
Committed: Fri Aug 11 10:08:51 2017 +0530

----------------------------------------------------------------------
 .../extract/jdbc/PostgresqlSource.java          |  38 +++
 .../source/jdbc/PostgresqlExtractor.java        | 265 +++++++++++++++++++
 .../source/jdbc/PostgresqlExtractorTest.java    | 147 ++++++++++
 3 files changed, 450 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java
new file mode 100644
index 0000000..e263a7e
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java
@@ -0,0 +1,38 @@
+package org.apache.gobblin.source.extractor.extract.jdbc;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
+import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
+import org.apache.gobblin.source.jdbc.PostgresqlExtractor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+
+/**
+ * An implementation of postgresql source to get work units
+ *
+ * @author tilakpatidar
+ */
+
+public class PostgresqlSource extends QueryBasedSource<JsonArray, JsonElement> {
+  private static final Logger LOG = LoggerFactory.getLogger(PostgresqlSource.class);
+
+  @Override
+  public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state)
+      throws IOException {
+    Extractor<JsonArray, JsonElement> extractor;
+    try {
+      extractor = new PostgresqlExtractor(state).build();
+    } catch (ExtractPrepareException e) {
+      LOG.error("Failed to prepare extractor: error - " + e.getMessage());
+      throw new IOException(e);
+    }
+    return extractor;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java
new file mode 100644
index 0000000..3fcd411
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java
@@ -0,0 +1,265 @@
+package org.apache.gobblin.source.jdbc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.exception.HighWatermarkException;
+import org.apache.gobblin.source.extractor.exception.RecordCountException;
+import org.apache.gobblin.source.extractor.exception.SchemaException;
+import org.apache.gobblin.source.extractor.extract.Command;
+import org.apache.gobblin.source.extractor.utils.Utils;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
+import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonElement;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class PostgresqlExtractor extends JdbcExtractor {
+  private static final String POSTGRES_TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
+  private static final String POSTGRES_DATE_FORMAT = "yyyy-MM-dd";
+  private static final String POSTGRES_HOUR_FORMAT = "HH";
+  private static final long SAMPLERECORDCOUNT = -1;
+
+  public PostgresqlExtractor(WorkUnitState workUnitState) {
+    super(workUnitState);
+  }
+
+  @Override
+  public String getHourPredicateCondition(String column, long value, String valueFormat,
String operator) {
+    log.debug("Getting hour predicate for Postgres");
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_HOUR_FORMAT);
+    return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue
+ "'";
+  }
+
+  @Override
+  public String getDatePredicateCondition(String column, long value, String valueFormat,
String operator) {
+    log.debug("Getting date predicate for Postgres");
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_DATE_FORMAT);
+    return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue
+ "'";
+  }
+
+  @Override
+  public String getTimestampPredicateCondition(String column, long value, String valueFormat,
String operator) {
+    log.debug("Getting timestamp predicate for Postgres");
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_TIMESTAMP_FORMAT);
+    return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue
+ "'";
+  }
+
+  @Override
+  public List<Command> getSchemaMetadata(String schema, String entity)
+      throws SchemaException {
+    log.debug("Build query to get schema");
+    List<Command> commands = new ArrayList<>();
+    List<String> queryParams = Arrays.asList(entity, schema);
+
+    String metadataSql = "select col.column_name, col.data_type, "
+        + "case when CHARACTER_OCTET_LENGTH is null then 0 else 0 end as length, "
+        + "case when NUMERIC_PRECISION is null then 0 else NUMERIC_PRECISION end as precesion,
"
+        + "case when NUMERIC_SCALE is null then 0 else NUMERIC_SCALE end as scale, "
+        + "case when is_nullable='NO' then 'false' else 'true' end as nullable, '' as format,
" + "'' as comment "
+        + "from information_schema.COLUMNS col "
+        + "WHERE upper(col.table_name)=upper(?) AND upper(col.table_schema)=upper(?) "
+        + "order by col.ORDINAL_POSITION";
+
+    commands.add(getCommand(metadataSql, JdbcCommand.JdbcCommandType.QUERY));
+    commands.add(getCommand(queryParams, JdbcCommand.JdbcCommandType.QUERYPARAMS));
+    return commands;
+  }
+
+  @Override
+  public List<Command> getHighWatermarkMetadata(String schema, String entity, String
watermarkColumn,
+      List<Predicate> predicateList)
+      throws HighWatermarkException {
+    log.debug("Build query to get high watermark");
+    List<Command> commands = new ArrayList<>();
+
+    String columnProjection = "max(" + Utils.getCoalesceColumnNames(watermarkColumn) + ")";
+    String watermarkFilter = this.concatPredicates(predicateList);
+    String query = this.getExtractSql();
+
+    if (StringUtils.isBlank(watermarkFilter)) {
+      watermarkFilter = "1=1";
+    }
+    query = query.replace(this.getOutputColumnProjection(), columnProjection)
+        .replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL,
watermarkFilter);
+
+    commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY));
+    return commands;
+  }
+
+  @Override
+  public List<Command> getCountMetadata(String schema, String entity, WorkUnit workUnit,
List<Predicate> predicateList)
+      throws RecordCountException {
+    log.debug("Build query to get source record count");
+    List<Command> commands = new ArrayList<>();
+
+    String columnProjection = "COUNT(1)";
+    String watermarkFilter = this.concatPredicates(predicateList);
+    String query = this.getExtractSql();
+
+    if (StringUtils.isBlank(watermarkFilter)) {
+      watermarkFilter = "1=1";
+    }
+
+    query = query.replace(this.getOutputColumnProjection(), columnProjection)
+        .replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL,
watermarkFilter);
+    String sampleFilter = this.constructSampleClause();
+    query = query + sampleFilter;
+
+    if (!StringUtils.isEmpty(sampleFilter)) {
+      query = "SELECT COUNT(1) FROM (" + query.replace(" COUNT(1) ", " 1 ") + ")temp";
+    }
+    commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY));
+    return commands;
+  }
+
+  @Override
+  public List<Command> getDataMetadata(String schema, String entity, WorkUnit workUnit,
List<Predicate> predicateList)
+      throws DataRecordException {
+    log.debug("Build query to extract data");
+    List<Command> commands = new ArrayList<>();
+    int fetchsize = this.workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE,
+        ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE);
+    String watermarkFilter = this.concatPredicates(predicateList);
+    String query = this.getExtractSql();
+    if (StringUtils.isBlank(watermarkFilter)) {
+      watermarkFilter = "1=1";
+    }
+
+    query = query.replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL,
watermarkFilter);
+    String sampleFilter = this.constructSampleClause();
+    query = query + sampleFilter;
+
+    commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY));
+    commands.add(getCommand(fetchsize, JdbcCommand.JdbcCommandType.FETCHSIZE));
+    return commands;
+  }
+
+  @Override
+  public String getConnectionUrl() {
+    String host = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
+    String port = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
+    String database = this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
+    return "jdbc:postgresql://" + host.trim() + ":" + port + "/" + database.trim();
+  }
+
+  /** {@inheritdoc} */
+  @Override
+  protected boolean convertBitToBoolean() {
+    return false;
+  }
+
+  @Override
+  public Map<String, String> getDataTypeMap() {
+    Map<String, String> dataTypeMap =
+        ImmutableMap.<String, String>builder().put("tinyint", "int").put("smallint",
"int").put("mediumint", "int")
+            .put("int", "int").put("bigint", "long").put("float", "float").put("double",
"double")
+            .put("decimal", "double").put("numeric", "double").put("date", "date").put("timestamp",
"timestamp")
+            .put("datetime", "timestamp").put("time", "time").put("char", "string").put("varchar",
"string")
+            .put("varbinary", "string").put("text", "string").put("tinytext", "string").put("mediumtext",
"string")
+            .put("longtext", "string").put("blob", "string").put("tinyblob", "string").put("mediumblob",
"string")
+            .put("longblob", "string").put("enum", "string").build();
+    return dataTypeMap;
+  }
+
+  @Override
+  public String getWatermarkSourceFormat(WatermarkType watermarkType) {
+    String columnFormat = null;
+    switch (watermarkType) {
+      case TIMESTAMP:
+        columnFormat = "yyyy-MM-dd HH:mm:ss";
+        break;
+      case DATE:
+        columnFormat = "yyyy-MM-dd";
+        break;
+      default:
+        log.error("Watermark type " + watermarkType.toString() + " not recognized");
+    }
+    return columnFormat;
+  }
+
+  @Override
+  public long exractSampleRecordCountFromQuery(String query) {
+    if (StringUtils.isBlank(query)) {
+      return SAMPLERECORDCOUNT;
+    }
+
+    long recordcount = SAMPLERECORDCOUNT;
+
+    String limit = null;
+    String inputQuery = query.toLowerCase();
+    int limitIndex = inputQuery.indexOf(" limit ");
+    if (limitIndex > 0) {
+      limit = query.substring(limitIndex + 7).trim();
+    }
+
+    if (StringUtils.isNotBlank(limit)) {
+      try {
+        recordcount = Long.parseLong(limit);
+      } catch (Exception e) {
+        log.error("Ignoring incorrct limit value in input query:" + limit);
+      }
+    }
+    return recordcount;
+  }
+
+  @Override
+  public String removeSampleClauseFromQuery(String query) {
+    if (StringUtils.isBlank(query)) {
+      return null;
+    }
+    String limitString = "";
+    String inputQuery = query.toLowerCase();
+    int limitIndex = inputQuery.indexOf(" limit");
+    if (limitIndex > 0) {
+      limitString = query.substring(limitIndex);
+    }
+    if (inputQuery.contains(" where ")) {
+      String newQuery = query.replace(limitString, " AND 1=1");
+      if (newQuery.toLowerCase().contains(" where and 1=1")) {
+        return query.replace(limitString, " 1=1");
+      }
+      return newQuery;
+    }
+    return query.replace(limitString, " where 1=1");
+  }
+
+  @Override
+  public String constructSampleClause() {
+    long sampleRowCount = this.getSampleRecordCount();
+    if (sampleRowCount >= 0) {
+      return " limit " + sampleRowCount;
+    }
+    return "";
+  }
+
+  @Override
+  public String getLeftDelimitedIdentifier() {
+    return this.enableDelimitedIdentifier ? "`" : "";
+  }
+
+  @Override
+  public String getRightDelimitedIdentifier() {
+    return this.enableDelimitedIdentifier ? "`" : "";
+  }
+
+  @Override
+  public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity,
WorkUnit workUnit,
+      List<Predicate> predicateList)
+      throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java
b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java
new file mode 100644
index 0000000..4b3ffbd
--- /dev/null
+++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.jdbc;
+
+import java.sql.ResultSet;
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.mockrunner.mock.jdbc.MockResultSet;
+
+import static org.testng.Assert.assertEquals;
+
+
+@Test(groups = {"gobblin.source.jdbc"})
+public class PostgresqlExtractorTest {
+
+  private final static List<MockJdbcColumn> COLUMNS = ImmutableList
+      .of(new MockJdbcColumn("id", "1", Types.INTEGER), new MockJdbcColumn("name", "name_1",
Types.VARCHAR),
+          new MockJdbcColumn("age", "20", Types.INTEGER));
+
+  private static final String QUERY_1 = "SELECT * FROM x WHERE LIMIT 532";
+  private static final String QUERY_2 = "SELECT * FROM x WHERE x.a < 10 LIMIT 50";
+  private static final String QUERY_3 = "SELECT * FROM x WHERE x.a < 10 AND x.b = 20 LIMIT
50";
+  private static final String QUERY_EMPTY = "";
+  private static final String QUERY_REG = "SELECT * FROM x WHERE x.a < 10";
+
+  private CommandOutput<JdbcCommand, ResultSet> output;
+  private State state;
+  private PostgresqlExtractor postgresqlExtractor;
+
+  @BeforeClass
+  public void setup() {
+    output = new JdbcCommandOutput();
+    try {
+      output.put(new JdbcCommand(), buildMockResultSet());
+    } catch (Exception e) {
+      // hack for test failure
+      assertEquals("PostgresqlExtractorTest: error initializing mock result set", "false");
+    }
+    state = new WorkUnitState();
+    state.setId("id");
+    postgresqlExtractor = new PostgresqlExtractor((WorkUnitState) state);
+  }
+
+  @Test
+  public void testConstructSampleClause()
+      throws Exception {
+    String sClause = postgresqlExtractor.constructSampleClause();
+    assertEquals(sClause.trim(), (" limit " + postgresqlExtractor.getSampleRecordCount()).trim());
+  }
+
+  @Test
+  public void testRemoveSampleClauseFromQuery()
+      throws Exception {
+    String q1Expected = "SELECT * FROM x WHERE 1=1";
+    String q2Expected = "SELECT * FROM x WHERE x.a < 10 AND 1=1";
+    String q3Expected = "SELECT * FROM x WHERE x.a < 10 AND x.b = 20 AND 1=1";
+
+    String q1Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_1);
+    String q2Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_2);
+    String q3Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_3);
+
+    assertEquals(q1Parsed, q1Expected);
+    assertEquals(q2Parsed, q2Expected);
+    assertEquals(q3Parsed, q3Expected);
+  }
+
+  @Test
+  public void testExractSampleRecordCountFromQuery()
+      throws Exception {
+    long res1 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_1);
+    long res2 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_2);
+    long res3 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_3);
+    long res4 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_EMPTY);
+    long res5 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_REG);
+
+    assertEquals(res1, (long) 532);
+    assertEquals(res2, (long) 50);
+    assertEquals(res3, (long) 50);
+    assertEquals(res4, (long) -1);
+    assertEquals(res5, (long) -1);
+  }
+
+  @Test
+  public void testHourPredicateCondition()
+      throws Exception {
+    String res1 = postgresqlExtractor.getHourPredicateCondition("my_time", 24L, "h", ">");
+    String res2 = postgresqlExtractor.getHourPredicateCondition("my_time", 23L, "HH", ">");
+    String res3 = postgresqlExtractor.getHourPredicateCondition("my_time", 2L, "h", ">");
+
+    assertEquals(res1, "my_time > '00'");
+    assertEquals(res2, "my_time > '23'");
+    assertEquals(res3, "my_time > '02'");
+  }
+
+  @Test
+  public void testDatePredicateCondition()
+      throws Exception {
+    String res1 = postgresqlExtractor.getDatePredicateCondition("my_date", 12061992L, "ddMMyyyy",
">");
+
+    assertEquals(res1, "my_date > '1992-06-12'");
+  }
+
+  @Test
+  public void testTimePredicateCondition()
+      throws Exception {
+    String res1 = postgresqlExtractor.getTimestampPredicateCondition("my_date", 12061992080809L,
"ddMMyyyyhhmmss", ">");
+
+    assertEquals(res1, "my_date > '1992-06-12 08:08:09'");
+  }
+
+  /**
+   * Build a mock implementation of Result using Mockito
+   */
+  private ResultSet buildMockResultSet()
+      throws Exception {
+
+    MockResultSet mrs = new MockResultSet(StringUtils.EMPTY);
+    for (MockJdbcColumn column : COLUMNS) {
+      mrs.addColumn(column.getColumnName(), ImmutableList.of(column.getValue()));
+    }
+    return mrs;
+  }
+}
\ No newline at end of file


Mime
View raw message