From dev-return-5875-archive-asf-public=cust-asf.ponee.io@gobblin.incubator.apache.org Fri Sep 13 22:13:41 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C9AE1180652 for ; Sat, 14 Sep 2019 00:13:40 +0200 (CEST) Received: (qmail 67406 invoked by uid 500); 13 Sep 2019 22:13:40 -0000 Mailing-List: contact dev-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list dev@gobblin.incubator.apache.org Received: (qmail 67394 invoked by uid 99); 13 Sep 2019 22:13:40 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2019 22:13:40 +0000 From: GitBox To: dev@gobblin.apache.org Subject: [GitHub] [incubator-gobblin] arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition Message-ID: <156841282009.32076.11511838316372791817.gitbox@gitbox.apache.org> Date: Fri, 13 Sep 2019 22:13:40 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit arekusuri commented on a change in pull request #2722: GOBBLIN-865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r324385070 ########## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ########## @@ -588,11 +576,41 @@ public String getTimestampPredicateCondition(String column, long value, String v return dataTypeMap; } + + private String partitionPkChunkingJobId = null; + private Iterator partitionPkChunkingBatchIdResultIterator = null; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) throws RuntimeException { + if (partitionPkChunkingBatchIdResultIterator == null) { + partitionPkChunkingJobId = workUnit.getProp(PK_CHUNKING_JOB_ID); + partitionPkChunkingBatchIdResultIterator = Arrays.stream(workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS).split(",")).iterator(); + } + if (!partitionPkChunkingBatchIdResultIterator.hasNext()) { + return null; + } + try { + if (!bulkApiLogin()) { + throw new IllegalArgumentException("Invalid Login"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + String[] batchIdResultIdArray = partitionPkChunkingBatchIdResultIterator.next().split(":"); + String batchId = batchIdResultIdArray[0]; + String resultId = batchIdResultIdArray[1]; + List rs = fetchPkChunkingResultSetWithRetry(bulkConnection, partitionPkChunkingJobId, batchId, resultId, fetchRetryLimit); + return rs.iterator(); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); - RecordSet rs = null; + + // new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource + if (!workUnit.getProp(PK_CHUNKING_JOB_ID, "").equals("")) { Review comment: Thanks! will do. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services