hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject hadoop git commit: HADOOP-13934. S3Guard: DynamoDBMetadataStore#move() could be throwing exception due to BatchWriteItem limits. Contributed by Mingliang Liu.
Date Thu, 05 Jan 2017 21:18:12 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-13345 b27317189 -> a412b1020


HADOOP-13934. S3Guard: DynamoDBMetadataStore#move() could be throwing exception due to BatchWriteItem
limits. Contributed by Mingliang Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a412b102
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a412b102
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a412b102

Branch: refs/heads/HADOOP-13345
Commit: a412b102078e6b17abcff76b472ec121b15d4e05
Parents: b273171
Author: Chris Nauroth <cnauroth@apache.org>
Authored: Thu Jan 5 13:09:05 2017 -0800
Committer: Chris Nauroth <cnauroth@apache.org>
Committed: Thu Jan 5 13:09:05 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/s3a/Constants.java     |  7 ++
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   | 62 +++++++++++++--
 .../PathMetadataDynamoDBTranslation.java        | 18 +++--
 .../s3a/s3guard/TestDynamoDBMetadataStore.java  | 80 ++++++++++++++++++--
 4 files changed, 150 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a412b102/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 4e7dfd6..c6acaaa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -335,6 +335,13 @@ public final class Constants {
   public static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100;
 
   /**
+   * The maximum put or delete requests per BatchWriteItem request.
+   *
+   * Refer to Amazon API reference for this limit.
+   */
+  public static final int S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT = 25;
+
+  /**
    * V1 committer.
    */
   @InterfaceStability.Unstable

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a412b102/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 439002b..89ce3c4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.s3guard;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
 import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
 import com.amazonaws.services.dynamodbv2.document.Table;
 import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
@@ -344,20 +346,68 @@ public class DynamoDBMetadataStore implements MetadataStore {
   @Override
   public void move(Collection<Path> pathsToDelete,
       Collection<PathMetadata> pathsToCreate) throws IOException {
-    final TableWriteItems writeItems = new TableWriteItems(tableName)
-        .withItemsToPut(pathMetadataToItem(pathsToCreate))
-        .withPrimaryKeysToDelete(pathToKey(pathsToDelete));
+    if (pathsToDelete == null && pathsToCreate == null) {
+      return;
+    }
+
+    LOG.debug("Moving paths of table {} in region {}: {} paths to delete and {}"
+        + " paths to create", tableName, region,
+        pathsToDelete == null ? 0 : pathsToDelete.size(),
+        pathsToCreate == null ? 0 : pathsToCreate.size());
+    LOG.trace("move: pathsToDelete = {}, pathsToCreate = {}",
+        pathsToDelete, pathsToCreate);
     try {
-      BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
+      processBatchWriteRequest(pathToKey(pathsToDelete),
+          pathMetadataToItem(pathsToCreate));
+    } catch (AmazonClientException e) {
+      throw translateException("move", (String) null, e);
+    }
+  }
 
+  /**
+   * Helper method to issue a batch write request to DynamoDB.
+   *
+   * Callers of this method should catch the {@link AmazonClientException} and
+   * translate it for better error report and easier debugging.
+   * @param keysToDelete primary keys to be deleted; can be null
+   * @param itemsToPut new items to be put; can be null
+   */
+  private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
+      Item[] itemsToPut) {
+    final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
+    final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
+    int count = 0;
+    while (count < totalToDelete + totalToPut) {
+      final TableWriteItems writeItems = new TableWriteItems(tableName);
+      int numToDelete = 0;
+      if (keysToDelete != null
+          && count < totalToDelete) {
+        numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT,
+            totalToDelete - count);
+        writeItems.withPrimaryKeysToDelete(
+            Arrays.copyOfRange(keysToDelete, count, count + numToDelete));
+        count += numToDelete;
+      }
+
+      if (numToDelete < S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT
+          && itemsToPut != null
+          && count < totalToDelete + totalToPut) {
+        final int numToPut = Math.min(
+            S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT - numToDelete,
+            totalToDelete + totalToPut - count);
+        final int index = count - totalToDelete;
+        writeItems.withItemsToPut(
+            Arrays.copyOfRange(itemsToPut, index, index + numToPut));
+        count += numToPut;
+      }
+
+      BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
       // Check for unprocessed keys in case of exceeding provisioned throughput
       Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
       while (unprocessed.size() > 0) {
         res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
         unprocessed = res.getUnprocessedItems();
       }
-    } catch (AmazonClientException e) {
-      throw translateException("createTable", (String) null, e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a412b102/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
index b3e23eb..15b23d2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -20,10 +20,8 @@ package org.apache.hadoop.fs.s3a.s3guard;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
@@ -145,10 +143,15 @@ final class PathMetadataDynamoDBTranslation {
    *
    * @see #pathMetadataToItem(PathMetadata)
    */
-  static Collection<Item> pathMetadataToItem(Collection<PathMetadata> metas)
{
-    final List<Item> items = new ArrayList<>(metas.size());
+  static Item[] pathMetadataToItem(Collection<PathMetadata> metas) {
+    if (metas == null) {
+      return null;
+    }
+
+    final Item[] items = new Item[metas.size()];
+    int i = 0;
     for (PathMetadata meta : metas) {
-      items.add(pathMetadataToItem(meta));
+      items[i++] = pathMetadataToItem(meta);
     }
     return items;
   }
@@ -186,7 +189,10 @@ final class PathMetadataDynamoDBTranslation {
    * @see #pathToKey(Path)
    */
   static PrimaryKey[] pathToKey(Collection<Path> paths) {
-    Preconditions.checkNotNull(paths);
+    if (paths == null) {
+      return null;
+    }
+
     final PrimaryKey[] keys = new PrimaryKey[paths.size()];
     int i = 0;
     for (Path p : paths) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a412b102/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
index e32c044..72144b7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.s3a.s3guard;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
@@ -30,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
 import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -54,8 +57,8 @@ import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 
-import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
-import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
 
 /**
  * Test that {@link DynamoDBMetadataStore} implements {@link MetadataStore}.
@@ -75,6 +78,8 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDynamoDBMetadataStore.class);
   private static final String BUCKET = "TestDynamoDBMetadataStore";
+  private static final String S3URI =
+      URI.create(Constants.FS_S3A + "://" + BUCKET).toString();
 
   /** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
   private static DynamoDBProxyServer dynamoDBLocalServer;
@@ -143,8 +148,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
       // using mocked S3 clients
       conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
           S3ClientFactory.class);
-      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
-          URI.create(Constants.FS_S3A + "://" + BUCKET).toString());
+      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, S3URI);
       // setting config for creating a DynamoDBClient against local server
       conf.set(Constants.ACCESS_KEY, "dummy-access-key");
       conf.set(Constants.SECRET_KEY, "dummy-secret-key");
@@ -177,7 +181,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
       throws IOException {
     String owner = UserGroupInformation.getCurrentUser().getShortUserName();
     return isDir
-        ? new S3AFileStatus(false, path, owner)
+        ? new S3AFileStatus(true, path, owner)
         : new S3AFileStatus(size, getModTime(), path, BLOCK_SIZE, owner);
   }
 
@@ -222,6 +226,72 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase
{
     }
   }
 
+  /**
+   * Test that for a large batch write request, the limit is handled correctly.
+   */
+  @Test
+  public void testBatchWrite() throws IOException {
+    final int[] numMetasToDeleteOrPut = {
+        -1, // null
+        0, // empty collection
+        1, // one path
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT, // exact limit of a batch request
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT + 1 // limit + 1
+    };
+    for (int numOldMetas : numMetasToDeleteOrPut) {
+      for (int numNewMetas : numMetasToDeleteOrPut) {
+        doTestBatchWrite(numOldMetas, numNewMetas);
+      }
+    }
+  }
+
+  private void doTestBatchWrite(int numDelete, int numPut) throws IOException {
+    final String root = S3URI + "/testBatchWrite_" + numDelete + '_' + numPut;
+    final Path oldDir = new Path(root, "oldDir");
+    final Path newDir = new Path(root, "newDir");
+    LOG.info("doTestBatchWrite: oldDir={}, newDir={}", oldDir, newDir);
+
+    try (DynamoDBMetadataStore ms = createContract().getMetadataStore()) {
+      ms.put(new PathMetadata(basicFileStatus(oldDir, 0, true)));
+      ms.put(new PathMetadata(basicFileStatus(newDir, 0, true)));
+
+      final Collection<PathMetadata> oldMetas =
+          numDelete < 0 ? null : new ArrayList<>(numDelete);
+      for (int i = 0; i < numDelete; i++) {
+        oldMetas.add(new PathMetadata(
+            basicFileStatus(new Path(oldDir, "child" + i), i, true)));
+      }
+      final Collection<PathMetadata> newMetas =
+          numPut < 0 ? null : new ArrayList<>(numPut);
+      for (int i = 0; i < numPut; i++) {
+        newMetas.add(new PathMetadata(
+            basicFileStatus(new Path(newDir, "child" + i), i, false)));
+      }
+
+      Collection<Path> pathsToDelete = null;
+      if (oldMetas != null) {
+        // put all metadata of old paths and verify
+        ms.put(new DirListingMetadata(oldDir, oldMetas, false));
+        assertEquals(0, ms.listChildren(newDir).numEntries());
+        assertTrue(CollectionUtils.isEqualCollection(oldMetas,
+            ms.listChildren(oldDir).getListing()));
+
+        pathsToDelete = new ArrayList<>(oldMetas.size());
+        for (PathMetadata meta : oldMetas) {
+          pathsToDelete.add(meta.getFileStatus().getPath());
+        }
+      }
+
+      // move the old paths to new paths and verify
+      ms.move(pathsToDelete, newMetas);
+      assertEquals(0, ms.listChildren(oldDir).numEntries());
+      if (newMetas != null) {
+        assertTrue(CollectionUtils.isEqualCollection(newMetas,
+            ms.listChildren(newDir).getListing()));
+      }
+    }
+  }
+
   @Test
   public void testCreateExistingTable() throws IOException {
     final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message