Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86DB6200D1B for ; Thu, 28 Sep 2017 03:00:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 83A891609EB; Thu, 28 Sep 2017 01:00:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7AE351609CA for ; Thu, 28 Sep 2017 03:00:00 +0200 (CEST) Received: (qmail 67100 invoked by uid 500); 28 Sep 2017 00:59:59 -0000 Mailing-List: contact commits-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list commits@gobblin.incubator.apache.org Received: (qmail 67087 invoked by uid 99); 28 Sep 2017 00:59:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Sep 2017 00:59:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 889B5F3287; Thu, 28 Sep 2017 00:59:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hutran@apache.org To: commits@gobblin.apache.org Message-Id: <607d871257714c6f9d1089a626435400@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-gobblin git commit: [GOBBLIN-265] Add support for PK chunking to gobblin-salesforce Date: Thu, 28 Sep 2017 00:59:59 +0000 (UTC) archived-at: Thu, 28 Sep 2017 01:00:01 -0000 Repository: incubator-gobblin Updated Branches: refs/heads/master 06bcc948c -> 859fadcc3 [GOBBLIN-265] Add support for PK chunking to gobblin-salesforce Closes #2120 from htran1/salesforce_pk_chunking Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/859fadcc Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/859fadcc Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/859fadcc Branch: refs/heads/master Commit: 859fadcc3ace98789ccab1e965da0ef7818f20b7 Parents: 06bcc94 Author: Hung Tran Authored: Wed Sep 27 17:59:52 2017 -0700 Committer: Hung Tran Committed: Wed Sep 27 17:59:52 2017 -0700 ---------------------------------------------------------------------- .../gobblin/salesforce/SalesforceExtractor.java | 153 ++++++++++++++++--- 1 file changed, 133 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/859fadcc/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index 22a0850..c0c340d 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -47,6 +47,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.sforce.async.AsyncApiException; import com.sforce.async.BatchInfo; +import com.sforce.async.BatchInfoList; import com.sforce.async.BatchStateEnum; import com.sforce.async.BulkConnection; import com.sforce.async.ConcurrencyMode; @@ -68,6 +69,7 @@ import org.apache.gobblin.source.extractor.exception.RestApiConnectionException; import org.apache.gobblin.source.extractor.exception.SchemaException; import org.apache.gobblin.source.extractor.extract.Command; import org.apache.gobblin.source.extractor.extract.CommandOutput; +import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.source.jdbc.SqlQueryUtils; import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand; import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType; @@ -81,6 +83,8 @@ import org.apache.gobblin.source.extractor.utils.Utils; import org.apache.gobblin.source.extractor.watermark.Predicate; import org.apache.gobblin.source.extractor.watermark.WatermarkType; import org.apache.gobblin.source.workunit.WorkUnit; + +import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -96,6 +100,14 @@ public class SalesforceExtractor extends RestApiExtractor { private static final String SALESFORCE_HOUR_FORMAT = "HH"; private static final String SALESFORCE_SOAP_AUTH_SERVICE = "/services/Soap/u"; private static final Gson GSON = new Gson(); + private static final int MAX_PK_CHUNKING_SIZE = 250000; + private static final int MIN_PK_CHUNKING_SIZE = 100000; + private static final int DEFAULT_PK_CHUNKING_SIZE = 200000; + private static final String ENABLE_PK_CHUNKING_KEY = "salesforce.enablePkChunking"; + private static final String PK_CHUNKING_SIZE_KEY = "salesforce.pkChunkingSize"; + private static final int MAX_RETRY_INTERVAL_SECS = 600; + // avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this + private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3; private boolean pullStatus = true; private String nextUrl; @@ -103,21 +115,40 @@ public class SalesforceExtractor extends RestApiExtractor { private BulkConnection bulkConnection = null; private boolean bulkApiInitialRun = true; private JobInfo bulkJob = new JobInfo(); - private BatchInfo bulkBatchInfo = null; private BufferedReader bulkBufferedReader = null; - private List bulkResultIdList = Lists.newArrayList(); + private List bulkResultIdList = Lists.newArrayList(); private int bulkResultIdCount = 0; private boolean bulkJobFinished = true; private List bulkRecordHeader; private int bulkResultColumCount; private boolean newBulkResultSet = true; private int bulkRecordCount = 0; + private int prevBulkRecordCount = 0; + private final boolean pkChunking; + private final int pkChunkingSize; private final SalesforceConnector sfConnector; public SalesforceExtractor(WorkUnitState state) { super(state); this.sfConnector = (SalesforceConnector) this.connector; + + // don't allow pk chunking if max partitions too high or have user specified partitions + if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) + || state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, + ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > PK_CHUNKING_MAX_PARTITIONS_LIMIT) { + if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) { + log.warn("Max partitions too high, so PK chunking is not enabled"); + } + + this.pkChunking = false; + } else { + this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false); + } + + this.pkChunkingSize = + Math.max(MIN_PK_CHUNKING_SIZE, + Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); } @Override @@ -640,22 +671,30 @@ public class SalesforceExtractor extends RestApiExtractor { /** * Get Record set using salesforce specific API(Bulk API) - * @param schema/databasename * @param entity/tablename - * @param list of all predicate conditions + * @param predicateList of all predicate conditions * @return iterator with batch of records */ - private List getQueryResultIds(String entity, List predicateList) throws Exception { + private List getQueryResultIds(String entity, List predicateList) throws Exception { if (!bulkApiLogin()) { throw new IllegalArgumentException("Invalid Login"); } try { + boolean usingPkChunking = false; + // Set bulk job attributes this.bulkJob.setObject(entity); this.bulkJob.setOperation(OperationEnum.query); this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel); + // use pk chunking if pk chunking is configured and the expected record count is larger than the pk chunking size + if (this.pkChunking && getExpectedRecordCount() > this.pkChunkingSize) { + log.info("Enabling pk chunking with size {}", this.pkChunkingSize); + this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize); + usingPkChunking = true; + } + // Result type as CSV this.bulkJob.setContentType(ContentType.CSV); @@ -680,32 +719,54 @@ public class SalesforceExtractor extends RestApiExtractor { log.info("QUERY:" + query); ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); - this.bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout); + BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout); - int retryInterval = 30 + (int) Math.ceil((float) this.getExpectedRecordCount() / 10000) * 2; + long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize : this.getExpectedRecordCount(); + + int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS, + 30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2); log.info("Salesforce bulk api retry interval in seconds:" + retryInterval); // Get batch info with complete resultset (info id - refers to the resultset id corresponding to entire resultset) - this.bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), this.bulkBatchInfo.getId()); + bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId()); - while ((this.bulkBatchInfo.getState() != BatchStateEnum.Completed) - && (this.bulkBatchInfo.getState() != BatchStateEnum.Failed)) { + // wait for completion, failure, or formation of PK chunking batches + while ((bulkBatchInfo.getState() != BatchStateEnum.Completed) + && (bulkBatchInfo.getState() != BatchStateEnum.Failed) + && (usingPkChunking && bulkBatchInfo.getState() != BatchStateEnum.NotProcessed)) { Thread.sleep(retryInterval * 1000); - this.bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), this.bulkBatchInfo.getId()); - log.debug("Bulk Api Batch Info:" + this.bulkBatchInfo); + bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId()); + log.debug("Bulk Api Batch Info:" + bulkBatchInfo); log.info("Waiting for bulk resultSetIds"); } - if (this.bulkBatchInfo.getState() == BatchStateEnum.Failed) { + // Wait for pk chunking batches + BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); + + if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed) { + bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval); + } + + if (bulkBatchInfo.getState() == BatchStateEnum.Failed) { log.error("Bulk batch failed: " + bulkBatchInfo.toString()); - throw new RuntimeException("Failed to get bulk batch info for jobId " + this.bulkBatchInfo.getJobId() - + " error - " + this.bulkBatchInfo.getStateMessage()); + throw new RuntimeException("Failed to get bulk batch info for jobId " + bulkBatchInfo.getJobId() + + " error - " + bulkBatchInfo.getStateMessage()); + } + + // Get resultset ids of all the batches from the batch info list + List batchIdAndResultIdList = Lists.newArrayList(); + + for (BatchInfo bi : batchInfoList.getBatchInfo()) { + QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId()); + + for (String result : list.getResult()) { + batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), result)); + } } - // Get resultset ids from the batch info - QueryResultList list = this.bulkConnection.getQueryResultList(this.bulkJob.getId(), this.bulkBatchInfo.getId()); + log.info("QueryResultList: " + batchIdAndResultIdList); - return Arrays.asList(list.getResult()); + return batchIdAndResultIdList; } catch (RuntimeException | AsyncApiException | InterruptedException e) { throw new RuntimeException( @@ -725,6 +786,12 @@ public class SalesforceExtractor extends RestApiExtractor { // if Buffer is empty then get stream for the new resultset id if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) { + // log the number of records from each result set after it is processed (bulkResultIdCount > 0) + if (this.bulkResultIdCount > 0) { + log.info("Result set {} had {} records", this.bulkResultIdCount, + this.bulkRecordCount - this.prevBulkRecordCount); + } + // if there is unprocessed resultset id then get result stream for that id if (this.bulkResultIdCount < this.bulkResultIdList.size()) { log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount)); @@ -732,11 +799,13 @@ public class SalesforceExtractor extends RestApiExtractor { this.bulkBufferedReader = new BufferedReader( new InputStreamReader( - this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkBatchInfo.getId(), - this.bulkResultIdList.get(this.bulkResultIdCount)), + this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), + this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(), + this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); this.bulkResultIdCount++; + this.prevBulkRecordCount = bulkRecordCount; } else { // if result stream processed for all resultset ids then finish the bulk job log.info("Bulk job is finished"); @@ -801,4 +870,48 @@ public class SalesforceExtractor extends RestApiExtractor { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } + /** + * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch + * @param batchInfoList list of batch info + * @param retryInterval the polling interval + * @return the last {@link BatchInfo} processed + * @throws InterruptedException + * @throws AsyncApiException + */ + private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) + throws InterruptedException, AsyncApiException { + BatchInfo batchInfo = null; + BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); + + // Wait for all batches other than the first one. The first one is not processed in PK chunking mode + for (int i = 1; i < batchInfos.length; i++) { + BatchInfo bi = batchInfos[i]; + + // get refreshed job status + bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); + + while ((bi.getState() != BatchStateEnum.Completed) + && (bi.getState() != BatchStateEnum.Failed)) { + Thread.sleep(retryInterval * 1000); + bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); + log.debug("Bulk Api Batch Info:" + bi); + log.info("Waiting for bulk resultSetIds"); + } + + batchInfo = bi; + + // exit if there was a failure + if (batchInfo.getState() == BatchStateEnum.Failed) { + break; + } + } + + return batchInfo; + } + + @Data + private static class BatchIdAndResultId { + private final String batchId; + private final String resultId; + } }