Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB2D0200CF3 for ; Wed, 13 Sep 2017 15:35:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E9BD11609CA; Wed, 13 Sep 2017 13:35:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1351E1609C9 for ; Wed, 13 Sep 2017 15:35:05 +0200 (CEST) Received: (qmail 54788 invoked by uid 500); 13 Sep 2017 13:35:05 -0000 Mailing-List: contact common-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-issues@hadoop.apache.org Received: (qmail 54777 invoked by uid 99); 13 Sep 2017 13:35:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Sep 2017 13:35:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 9DF25185A53 for ; Wed, 13 Sep 2017 13:35:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.201 X-Spam-Level: X-Spam-Status: No, score=-99.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id wLy6BJsUS2kK for ; Wed, 13 Sep 2017 13:35:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id B2E965FB96 for ; Wed, 13 Sep 2017 13:35:02 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 984E0E06C4 for ; Wed, 13 Sep 2017 13:35:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 9302C2538E for ; Wed, 13 Sep 2017 13:35:00 +0000 (UTC) Date: Wed, 13 Sep 2017 13:35:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: common-issues@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HADOOP-13600) S3a rename() to copy files in a directory in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 13 Sep 2017 13:35:07 -0000 [ https://issues.apache.org/jira/browse/HADOOP-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164660#comment-16164660 ] ASF GitHub Bot commented on HADOOP-13600: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/157#discussion_r138620461 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java --- @@ -891,50 +902,123 @@ private boolean innerRename(Path source, Path dest) } List keysToDelete = new ArrayList<>(); + List dirKeysToDelete = new ArrayList<>(); if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { // delete unnecessary fake directory. keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - Path parentPath = keyToPath(srcKey); - RemoteIterator iterator = listFilesAndEmptyDirectories( - parentPath, true); - while (iterator.hasNext()) { - LocatedFileStatus status = iterator.next(); - long length = status.getLen(); - String key = pathToKey(status.getPath()); - if (status.isDirectory() && !key.endsWith("/")) { - key += "/"; - } - keysToDelete - .add(new DeleteObjectsRequest.KeyVersion(key)); - String newDstKey = - dstKey + key.substring(srcKey.length()); - copyFile(key, newDstKey, length); - - if (hasMetadataStore()) { - // with a metadata store, the object entries need to be updated, - // including, potentially, the ancestors - Path childSrc = keyToQualifiedPath(key); - Path childDst = keyToQualifiedPath(newDstKey); - if (objectRepresentsDirectory(key, length)) { - S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, - childDst, username); + // A blocking queue that tracks all objects that need to be deleted + BlockingQueue> deleteQueue = new ArrayBlockingQueue<>( + (int) Math.round(MAX_ENTRIES_TO_DELETE * 1.5)); + + // Used to track if the delete thread was gracefully shutdown + boolean deleteFutureComplete = false; + FutureTask deleteFuture = null; + + try { + // Launch a thread that will read from the deleteQueue and batch delete any files that have already been copied + deleteFuture = new FutureTask<>(() -> { + while (true) { + while (keysToDelete.size() < MAX_ENTRIES_TO_DELETE) { + Optional key = deleteQueue.take(); + + // The thread runs until is is given an EOF message (an Optional#empty()) + if (key.isPresent()) { + keysToDelete.add(key.get()); + } else { + + // Delete any remaining keys and exit + removeKeys(keysToDelete, true, false); + return null; + } + } + removeKeys(keysToDelete, true, false); + } + }); + + Thread deleteThread = new Thread(deleteFuture); + deleteThread.setName("s3a-rename-delete-thread"); + deleteThread.start(); + + // Used to abort future copy tasks as soon as one copy task fails + AtomicBoolean copyFailure = new AtomicBoolean(false); + List copies = new ArrayList<>(); + + Path parentPath = keyToPath(srcKey); + RemoteIterator iterator = listFilesAndEmptyDirectories( + parentPath, true); + while (iterator.hasNext()) { + LocatedFileStatus status = iterator.next(); + long length = status.getLen(); + String key = pathToKey(status.getPath()); + if (status.isDirectory() && !key.endsWith("/")) { + key += "/"; + } + if (status.isDirectory()) { + dirKeysToDelete.add(new DeleteObjectsRequest.KeyVersion(key)); + } + String newDstKey = + dstKey + key.substring(srcKey.length()); + + // If no previous file hit a copy failure, copy this file + if (!copyFailure.get()) { + copies.add(new CopyContext(copyFileAsync(key, newDstKey, + new RenameProgressListener(this, srcStatus, status.isDirectory() ? null : + new DeleteObjectsRequest.KeyVersion(key), deleteQueue, copyFailure)), + key, newDstKey, length)); } else { - S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, - childDst, length, getDefaultBlockSize(childDst), username); + // We got a copy failure, so don't bother going through the rest of the files + break; } - // Ancestor directories may not be listed, so we explicitly add them - S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, - keyToQualifiedPath(srcKey), childSrc, childDst, username); } - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); + for (CopyContext copyContext : copies) { + try { + copyContext.getCopy().waitForCopyResult(); + } catch (InterruptedException e) { + throw new RenameFailedException(copyContext.getSrcKey(), copyContext.getDstKey(), e); + } + + if (hasMetadataStore()) { + // with a metadata store, the object entries need to be updated, + // including, potentially, the ancestors + Path childSrc = keyToQualifiedPath(copyContext.getSrcKey()); + Path childDst = keyToQualifiedPath(copyContext.getDstKey()); + if (objectRepresentsDirectory(copyContext.getSrcKey(), copyContext.getLength())) { + S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, + childDst, username); + } else { + S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, + childDst, copyContext.getLength(), getDefaultBlockSize(childDst), username); + } + // Ancestor directories may not be listed, so we explicitly add them + S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, + keyToQualifiedPath(srcKey), childSrc, childDst, username); + } + } + + if (copyFailure.get()) { + throw new RenameFailedException(srcKey, dstKey, + new IllegalStateException("Progress listener indicated a copy failure, but no exception was thrown")); + } + + try { + for (DeleteObjectsRequest.KeyVersion dirKey : dirKeysToDelete) { + deleteQueue.put(Optional.of(dirKey)); + } + deleteQueue.put(Optional.empty()); + deleteFuture.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RenameFailedException(srcKey, dstKey, e); + } + deleteFutureComplete = true; + } finally { + if (!deleteFutureComplete) { + if (deleteFuture != null && !deleteFuture.isDone() && !deleteFuture.isCancelled()) { + deleteFuture.cancel(true); + } --- End diff -- mmm. I mostly concur, handing in either the S3aFS or the (expanded) WriteOperationsHelper > S3a rename() to copy files in a directory in parallel > ----------------------------------------------------- > > Key: HADOOP-13600 > URL: https://issues.apache.org/jira/browse/HADOOP-13600 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 2.7.3 > Reporter: Steve Loughran > Assignee: Sahil Takiar > Attachments: HADOOP-13600.001.patch > > > Currently a directory rename does a one-by-one copy, making the request O(files * data). If the copy operations were launched in parallel, the duration of the copy may be reducable to the duration of the longest copy. For a directory with many files, this will be significant -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-issues-help@hadoop.apache.org