gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-265] Add support for PK chunking to gobblin-salesforce
Date Thu, 28 Sep 2017 00:59:59 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 06bcc948c -> 859fadcc3


[GOBBLIN-265] Add support for PK chunking to gobblin-salesforce

Closes #2120 from htran1/salesforce_pk_chunking


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/859fadcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/859fadcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/859fadcc

Branch: refs/heads/master
Commit: 859fadcc3ace98789ccab1e965da0ef7818f20b7
Parents: 06bcc94
Author: Hung Tran <hutran@linkedin.com>
Authored: Wed Sep 27 17:59:52 2017 -0700
Committer: Hung Tran <hutran@linkedin.com>
Committed: Wed Sep 27 17:59:52 2017 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceExtractor.java | 153 ++++++++++++++++---
 1 file changed, 133 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/859fadcc/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 22a0850..c0c340d 100644
--- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -47,6 +47,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.sforce.async.AsyncApiException;
 import com.sforce.async.BatchInfo;
+import com.sforce.async.BatchInfoList;
 import com.sforce.async.BatchStateEnum;
 import com.sforce.async.BulkConnection;
 import com.sforce.async.ConcurrencyMode;
@@ -68,6 +69,7 @@ import org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
 import org.apache.gobblin.source.extractor.exception.SchemaException;
 import org.apache.gobblin.source.extractor.extract.Command;
 import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.source.jdbc.SqlQueryUtils;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
@@ -81,6 +83,8 @@ import org.apache.gobblin.source.extractor.utils.Utils;
 import org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
 import org.apache.gobblin.source.workunit.WorkUnit;
+
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -96,6 +100,14 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
   private static final String SALESFORCE_SOAP_AUTH_SERVICE = "/services/Soap/u";
   private static final Gson GSON = new Gson();
+  private static final int MAX_PK_CHUNKING_SIZE = 250000;
+  private static final int MIN_PK_CHUNKING_SIZE = 100000;
+  private static final int DEFAULT_PK_CHUNKING_SIZE = 200000;
+  private static final String ENABLE_PK_CHUNKING_KEY = "salesforce.enablePkChunking";
+  private static final String PK_CHUNKING_SIZE_KEY = "salesforce.pkChunkingSize";
+  private static final int MAX_RETRY_INTERVAL_SECS = 600;
+  // avoid using too many bulk API calls by only allowing PK chunking only if max partitions
is configured <= this
+  private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
 
   private boolean pullStatus = true;
   private String nextUrl;
@@ -103,21 +115,40 @@ public class SalesforceExtractor extends RestApiExtractor {
   private BulkConnection bulkConnection = null;
   private boolean bulkApiInitialRun = true;
   private JobInfo bulkJob = new JobInfo();
-  private BatchInfo bulkBatchInfo = null;
   private BufferedReader bulkBufferedReader = null;
-  private List<String> bulkResultIdList = Lists.newArrayList();
+  private List<BatchIdAndResultId> bulkResultIdList = Lists.newArrayList();
   private int bulkResultIdCount = 0;
   private boolean bulkJobFinished = true;
   private List<String> bulkRecordHeader;
   private int bulkResultColumCount;
   private boolean newBulkResultSet = true;
   private int bulkRecordCount = 0;
+  private int prevBulkRecordCount = 0;
 
+  private final boolean pkChunking;
+  private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
 
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
     this.sfConnector = (SalesforceConnector) this.connector;
+
+    // don't allow pk chunking if max partitions too high or have user specified partitions
+    if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false)
+        || state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
+        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > PK_CHUNKING_MAX_PARTITIONS_LIMIT)
{
+      if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
+        log.warn("Max partitions too high, so PK chunking is not enabled");
+      }
+
+      this.pkChunking = false;
+    } else {
+      this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
+    }
+
+    this.pkChunkingSize =
+        Math.max(MIN_PK_CHUNKING_SIZE,
+            Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
   }
 
   @Override
@@ -640,22 +671,30 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   /**
    * Get Record set using salesforce specific API(Bulk API)
-   * @param schema/databasename
    * @param entity/tablename
-   * @param list of all predicate conditions
+   * @param predicateList of all predicate conditions
      * @return iterator with batch of records
    */
-  private List<String> getQueryResultIds(String entity, List<Predicate> predicateList)
throws Exception {
+  private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate>
predicateList) throws Exception {
     if (!bulkApiLogin()) {
       throw new IllegalArgumentException("Invalid Login");
     }
 
     try {
+      boolean usingPkChunking = false;
+
       // Set bulk job attributes
       this.bulkJob.setObject(entity);
       this.bulkJob.setOperation(OperationEnum.query);
       this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
 
+      // use pk chunking if pk chunking is configured and the expected record count is larger
than the pk chunking size
+      if (this.pkChunking && getExpectedRecordCount() > this.pkChunkingSize) {
+        log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
+        this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize);
+        usingPkChunking = true;
+      }
+
       // Result type as CSV
       this.bulkJob.setContentType(ContentType.CSV);
 
@@ -680,32 +719,54 @@ public class SalesforceExtractor extends RestApiExtractor {
       log.info("QUERY:" + query);
       ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
-      this.bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
+      BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
 
-      int retryInterval = 30 + (int) Math.ceil((float) this.getExpectedRecordCount() / 10000)
* 2;
+      long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize : this.getExpectedRecordCount();
+
+      int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS,
+          30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2);
       log.info("Salesforce bulk api retry interval in seconds:" + retryInterval);
 
       // Get batch info with complete resultset (info id - refers to the resultset id corresponding
to entire resultset)
-      this.bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), this.bulkBatchInfo.getId());
+      bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
 
-      while ((this.bulkBatchInfo.getState() != BatchStateEnum.Completed)
-          && (this.bulkBatchInfo.getState() != BatchStateEnum.Failed)) {
+      // wait for completion, failure, or formation of PK chunking batches
+      while ((bulkBatchInfo.getState() != BatchStateEnum.Completed)
+          && (bulkBatchInfo.getState() != BatchStateEnum.Failed)
+          && (usingPkChunking && bulkBatchInfo.getState() != BatchStateEnum.NotProcessed))
{
         Thread.sleep(retryInterval * 1000);
-        this.bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), this.bulkBatchInfo.getId());
-        log.debug("Bulk Api Batch Info:" + this.bulkBatchInfo);
+        bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
+        log.debug("Bulk Api Batch Info:" + bulkBatchInfo);
         log.info("Waiting for bulk resultSetIds");
       }
 
-      if (this.bulkBatchInfo.getState() == BatchStateEnum.Failed) {
+      // Wait for pk chunking batches
+      BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
+
+      if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed)
{
+        bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval);
+      }
+
+      if (bulkBatchInfo.getState() == BatchStateEnum.Failed) {
         log.error("Bulk batch failed: " + bulkBatchInfo.toString());
-        throw new RuntimeException("Failed to get bulk batch info for jobId " + this.bulkBatchInfo.getJobId()
-            + " error - " + this.bulkBatchInfo.getStateMessage());
+        throw new RuntimeException("Failed to get bulk batch info for jobId " + bulkBatchInfo.getJobId()
+            + " error - " + bulkBatchInfo.getStateMessage());
+      }
+
+      // Get resultset ids of all the batches from the batch info list
+      List<BatchIdAndResultId> batchIdAndResultIdList = Lists.newArrayList();
+
+      for (BatchInfo bi : batchInfoList.getBatchInfo()) {
+        QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(),
bi.getId());
+
+        for (String result : list.getResult()) {
+          batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result));
+        }
       }
 
-      // Get resultset ids from the batch info
-      QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(),
this.bulkBatchInfo.getId());
+      log.info("QueryResultList: " + batchIdAndResultIdList);
 
-      return Arrays.asList(list.getResult());
+      return batchIdAndResultIdList;
 
     } catch (RuntimeException | AsyncApiException | InterruptedException e) {
       throw new RuntimeException(
@@ -725,6 +786,12 @@ public class SalesforceExtractor extends RestApiExtractor {
       // if Buffer is empty then get stream for the new resultset id
       if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) {
 
+        // log the number of records from each result set after it is processed (bulkResultIdCount
> 0)
+        if (this.bulkResultIdCount > 0) {
+          log.info("Result set {} had {} records", this.bulkResultIdCount,
+              this.bulkRecordCount - this.prevBulkRecordCount);
+        }
+
         // if there is unprocessed resultset id then get result stream for that id
         if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
           log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount));
@@ -732,11 +799,13 @@ public class SalesforceExtractor extends RestApiExtractor {
           this.bulkBufferedReader =
               new BufferedReader(
                   new InputStreamReader(
-                      this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkBatchInfo.getId(),
-                          this.bulkResultIdList.get(this.bulkResultIdCount)),
+                      this.bulkConnection.getQueryResultStream(this.bulkJob.getId(),
+                          this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(),
+                          this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()),
                       ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
           this.bulkResultIdCount++;
+          this.prevBulkRecordCount = bulkRecordCount;
         } else {
           // if result stream processed for all resultset ids then finish the bulk job
           log.info("Bulk job is finished");
@@ -801,4 +870,48 @@ public class SalesforceExtractor extends RestApiExtractor {
     return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET));
   }
 
+  /**
+   * Waits for the PK batches to complete. The wait will stop after all batches are complete
or on the first failed batch
+   * @param batchInfoList list of batch info
+   * @param retryInterval the polling interval
+   * @return the last {@link BatchInfo} processed
+   * @throws InterruptedException
+   * @throws AsyncApiException
+   */
+  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval)
+      throws InterruptedException, AsyncApiException {
+    BatchInfo batchInfo = null;
+    BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
+
+    // Wait for all batches other than the first one. The first one is not processed in PK
chunking mode
+    for (int i = 1; i < batchInfos.length; i++) {
+      BatchInfo bi = batchInfos[i];
+
+      // get refreshed job status
+      bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
+
+      while ((bi.getState() != BatchStateEnum.Completed)
+          && (bi.getState() != BatchStateEnum.Failed)) {
+        Thread.sleep(retryInterval * 1000);
+        bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
+        log.debug("Bulk Api Batch Info:" + bi);
+        log.info("Waiting for bulk resultSetIds");
+      }
+
+      batchInfo = bi;
+
+      // exit if there was a failure
+      if (batchInfo.getState() == BatchStateEnum.Failed) {
+        break;
+      }
+    }
+
+    return batchInfo;
+  }
+
+  @Data
+  private static class BatchIdAndResultId {
+    private final String batchId;
+    private final String resultId;
+  }
 }


Mime
View raw message