From commits-return-13743-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Fri Mar 20 04:17:51 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C928E180661 for ; Fri, 20 Mar 2020 05:17:50 +0100 (CET) Received: (qmail 81802 invoked by uid 500); 20 Mar 2020 04:17:50 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 81791 invoked by uid 99); 20 Mar 2020 04:17:49 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2020 04:17:49 +0000 From: GitBox To: commits@hudi.apache.org Subject: [GitHub] [incubator-hudi] ffcchi commented on a change in pull request #1421: [HUDI-724] Parallelize getSmallFiles for partitions Message-ID: <158467786955.29697.6066659446294210607.gitbox@gitbox.apache.org> References: In-Reply-To: Date: Fri, 20 Mar 2020 04:17:49 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit ffcchi commented on a change in pull request #1421: [HUDI-724] Parallelize getSmallFiles for partitions URL: https://github.com/apache/incubator-hudi/pull/1421#discussion_r395433104 ########## File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java ########## @@ -602,18 +602,39 @@ private int addUpdateBucket(String fileIdHint) { return bucket; } - private void assignInserts(WorkloadProfile profile) { + private void assignInserts(WorkloadProfile profile, JavaSparkContext jsc) { // for new inserts, compute buckets depending on how many records we have for each partition Set partitionPaths = profile.getPartitionPaths(); long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); LOG.info("AvgRecordSize => " + averageRecordSize); + + HashMap> partitionSmallFilesMap = new HashMap<>(); + if (jsc != null && partitionPaths.size() > 1) { + //Parellelize the GetSmallFile Operation by using RDDs + List partitionPathsList = new ArrayList<>(partitionPaths); + JavaRDD partitionPathRdds = jsc.parallelize(partitionPathsList, partitionPathsList.size()); + List>> partitionSmallFileTuples = + partitionPathRdds.map(it -> new Tuple2>(it, getSmallFiles(it))).collect(); + + for (Tuple2> tuple : partitionSmallFileTuples) { + partitionSmallFilesMap.put(tuple._1, tuple._2); + } + } + for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { - List smallFiles = getSmallFiles(partitionPath); + List smallFiles; + if (partitionSmallFilesMap.isEmpty()) { Review comment: sounds good. will do ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services