From dev-return-5876-archive-asf-public=cust-asf.ponee.io@gobblin.incubator.apache.org Fri Sep 13 22:14:05 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9CC2B180652 for ; Sat, 14 Sep 2019 00:14:05 +0200 (CEST) Received: (qmail 67796 invoked by uid 500); 13 Sep 2019 22:14:05 -0000 Mailing-List: contact dev-help@gobblin.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gobblin.incubator.apache.org Delivered-To: mailing list dev@gobblin.incubator.apache.org Received: (qmail 67785 invoked by uid 99); 13 Sep 2019 22:14:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2019 22:14:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 907CFC1D07 for ; Fri, 13 Sep 2019 22:14:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -112.198 X-Spam-Level: X-Spam-Status: No, score=-112.198 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100, WEIRD_QUOTING=0.001] autolearn=disabled Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id KFVNJ1FoaVL9 for ; Fri, 13 Sep 2019 22:14:02 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=207.244.88.153; helo=mail.apache.org; envelope-from=jira@apache.org; receiver= Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with SMTP id 30796BC551 for ; Fri, 13 Sep 2019 22:14:02 +0000 (UTC) Received: (qmail 67634 invoked by uid 99); 13 Sep 2019 22:14:01 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2019 22:14:01 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id C95D3E30CB for ; Fri, 13 Sep 2019 22:14:00 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 1E5057804E1 for ; Fri, 13 Sep 2019 22:14:00 +0000 (UTC) Date: Fri, 13 Sep 2019 22:14:00 +0000 (UTC) From: "ASF GitHub Bot (Jira)" To: dev@gobblin.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Work logged] (GOBBLIN-865) Add feature that enables PK-chunking in partition MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/GOBBLIN-865?focusedWorklogId= =3D312400&page=3Dcom.atlassian.jira.plugin.system.issuetabpanels:worklog-ta= bpanel#worklog-312400 ] ASF GitHub Bot logged work on GOBBLIN-865: ------------------------------------------ Author: ASF GitHub Bot Created on: 13/Sep/19 22:13 Start Date: 13/Sep/19 22:13 Worklog Time Spent: 10m=20 Work Description: arekusuri commented on pull request #2722: GOBBLIN-= 865: Add feature that enables PK-chunking in partition URL: https://github.com/apache/incubator-gobblin/pull/2722#discussion_r3243= 85070 =20 =20 ########## File path: gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/= SalesforceExtractor.java ########## @@ -588,11 +576,41 @@ public String getTimestampPredicateCondition(String = column, long value, String v return dataTypeMap; } =20 + + private String partitionPkChunkingJobId =3D null; + private Iterator partitionPkChunkingBatchIdResultIterator =3D nu= ll; + + private Iterator getRecordSetPkchunking(WorkUnit workUnit) = throws RuntimeException { + if (partitionPkChunkingBatchIdResultIterator =3D=3D null) { + partitionPkChunkingJobId =3D workUnit.getProp(PK_CHUNKING_JOB_ID); + partitionPkChunkingBatchIdResultIterator =3D Arrays.stream(workUnit.= getProp(PK_CHUNKING_BATCH_RESULT_IDS).split(",")).iterator(); + } + if (!partitionPkChunkingBatchIdResultIterator.hasNext()) { + return null; + } + try { + if (!bulkApiLogin()) { + throw new IllegalArgumentException("Invalid Login"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + String[] batchIdResultIdArray =3D partitionPkChunkingBatchIdResultIter= ator.next().split(":"); + String batchId =3D batchIdResultIdArray[0]; + String resultId =3D batchIdResultIdArray[1]; + List rs =3D fetchPkChunkingResultSetWithRetry(bulkConnect= ion, partitionPkChunkingJobId, batchId, resultId, fetchRetryLimit); + return rs.iterator(); + } + @Override public Iterator getRecordSetFromSourceApi(String schema, St= ring entity, WorkUnit workUnit, List predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); - RecordSet rs =3D null; + + // new version of extractor: bulk api with pk-chunking in pre-partitio= ning of SalesforceSource + if (!workUnit.getProp(PK_CHUNKING_JOB_ID, "").equals("")) { =20 Review comment: Thanks! will do. =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 312400) Time Spent: 4h 20m (was: 4h 10m) > Add feature that enables PK-chunking in partition=20 > -------------------------------------------------- > > Key: GOBBLIN-865 > URL: https://issues.apache.org/jira/browse/GOBBLIN-865 > Project: Apache Gobblin > Issue Type: Task > Reporter: Alex Li > Priority: Major > Labels: salesforce > Time Spent: 4h 20m > Remaining Estimate: 0h > > In SFDC(salesforce) connector, we have partitioning mechanisms to split a= giant query to multiple sub queries. There are 3 mechanisms: > * simple partition (equally split by time) > * dynamic pre-partition (generate=C2=A0histogram and split by row number= s) > * user specified partition (set up time range in job file) > However there are tables like Task and Contract are failing time to time = to fetch full data. > We may want to utilize PK-chunking to partition the query. > =C2=A0 > The pk-chunking doc from=C2=A0SFDC -=C2=A0[https://developer.salesforce.c= om/docs/atlas.en-us.api_asynch.meta/api_asynch/async_api_headers_enable_pk_= chunking.htm] -- This message was sent by Atlassian Jira (v8.3.2#803003)