hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lium...@apache.org
Subject hadoop git commit: HADOOP-14215. DynamoDB client should waitForActive on existing tables. Contributed by Mingliang Liu and Sean Mackrory
Date Fri, 07 Apr 2017 01:40:49 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-13345 be03eb6a4 -> 872e10610


HADOOP-14215. DynamoDB client should waitForActive on existing tables. Contributed by Mingliang
Liu and Sean Mackrory


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/872e1061
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/872e1061
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/872e1061

Branch: refs/heads/HADOOP-13345
Commit: 872e10610cd53265e5b44d32aab4d257f40935f5
Parents: be03eb6
Author: Mingliang Liu <liuml07@apache.org>
Authored: Thu Apr 6 17:42:49 2017 -0700
Committer: Mingliang Liu <liuml07@apache.org>
Committed: Thu Apr 6 18:40:19 2017 -0700

----------------------------------------------------------------------
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   |  83 +++++++++--
 .../s3a/s3guard/ITestS3GuardConcurrentOps.java  | 139 +++++++++++++++++++
 2 files changed, 207 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/872e1061/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index b2f011c..e0a171d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -698,7 +698,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
    * after this method returns successfully.
    *
    * @throws IOException if table does not exist and auto-creation is disabled;
-   * or any other I/O exception occurred.
+   * or table is being deleted, or any other I/O exception occurred.
    */
   @VisibleForTesting
   void initTable() throws IOException {
@@ -706,15 +706,31 @@ public class DynamoDBMetadataStore implements MetadataStore {
     try {
       try {
         LOG.debug("Binding to table {}", tableName);
-        table.describe();
-        final Item versionMarker = table.getItem(
-            createVersionMarkerPrimaryKey(VERSION_MARKER));
+        final String status = table.describe().getTableStatus();
+        switch (status) {
+          case "CREATING":
+          case "UPDATING":
+            LOG.debug("Table {} in region {} is being created/updated. This may "
+                    + "indicate that the table is being operated by another "
+                    + "concurrent thread or process. Waiting for active...",
+                tableName, region);
+            waitForTableActive(table);
+            break;
+          case "DELETING":
+            throw new IOException("DynamoDB table '" + tableName + "' is being "
+                + "deleted in region " + region);
+          case "ACTIVE":
+            break;
+          default:
+            throw new IOException("Unknown DynamoDB table status " + status
+                + ": tableName='" + tableName + "', region=" + region);
+        }
+
+        final Item versionMarker = getVersionMarkerItem();
         verifyVersionCompatibility(tableName, versionMarker);
         Long created = extractCreationTimeFromMarker(versionMarker);
         LOG.debug("Using existing DynamoDB table {} in region {} created {}",
-            tableName, region,
-            created != null ? new Date(created) : null);
-
+            tableName, region, (created != null) ? new Date(created) : null);
       } catch (ResourceNotFoundException rnfe) {
         if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
           final ProvisionedThroughput capacity = new ProvisionedThroughput(
@@ -736,6 +752,35 @@ public class DynamoDBMetadataStore implements MetadataStore {
   }
 
   /**
+   * Get the version mark item in the existing DynamoDB table.
+   *
+   * As the version marker item may be created by another concurrent thread or
+   * process, we retry a limited times before we fail to get it.
+   */
+  private Item getVersionMarkerItem() throws IOException {
+    final PrimaryKey versionMarkerKey =
+        createVersionMarkerPrimaryKey(VERSION_MARKER);
+    int retryCount = 0;
+    Item versionMarker = table.getItem(versionMarkerKey);
+    while (versionMarker == null) {
+      try {
+        RetryPolicy.RetryAction action = batchRetryPolicy.shouldRetry(null,
+            retryCount, 0, true);
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+          break;
+        } else {
+          LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
+          Thread.sleep(action.delayMillis);
+        }
+      } catch (Exception e) {
+        throw new IOException("initTable: Unexpected exception", e);
+      }
+      versionMarker = table.getItem(versionMarkerKey);
+    }
+    return versionMarker;
+  }
+
+  /**
    * Verify that a table version is compatible with this S3Guard client.
    * @param tableName name of the table (for error messages)
    * @param versionMarker the version marker retrieved from the table
@@ -762,6 +807,21 @@ public class DynamoDBMetadataStore implements MetadataStore {
   }
 
   /**
+   * Wait for table being active.
+   */
+  private void waitForTableActive(Table table) throws IOException {
+    try {
+      table.waitForActive();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for table {} in region {} active",
+          tableName, region, e);
+      Thread.currentThread().interrupt();
+      throw (IOException) new InterruptedIOException("DynamoDB table '"
+          + tableName + "' is not active yet in region " + region).initCause(e);
+    }
+  }
+
+  /**
    * Create a table, wait for it to become active, then add the version
    * marker.
    * @param capacity capacity to provision
@@ -777,20 +837,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
           .withAttributeDefinitions(attributeDefinitions())
           .withProvisionedThroughput(capacity));
       LOG.debug("Awaiting table becoming active");
-      table.waitForActive();
     } catch (ResourceInUseException e) {
       LOG.warn("ResourceInUseException while creating DynamoDB table {} "
               + "in region {}.  This may indicate that the table was "
               + "created by another concurrent thread or process.",
           tableName, region);
-    } catch (InterruptedException e) {
-      LOG.warn("Interrupted while waiting for DynamoDB table {} active",
-          tableName, e);
-      Thread.currentThread().interrupt();
-      throw (IOException) new InterruptedIOException(
-          "DynamoDB table '" + tableName + "' "
-              + "is not active yet in region " + region).initCause(e);
     }
+    waitForTableActive(table);
     final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
         System.currentTimeMillis());
     putItem(marker);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/872e1061/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
new file mode 100644
index 0000000..bfad328
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Tests concurrent operations on S3Guard.
+ */
+public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
+
+  @Rule
+  public final Timeout timeout = new Timeout(5 * 60 * 1000);
+
+  private void failIfTableExists(DynamoDB db, String tableName) {
+    boolean tableExists = true;
+    try {
+      Table table = db.getTable(tableName);
+      table.describe();
+    } catch (ResourceNotFoundException e) {
+      tableExists = false;
+    }
+    if (tableExists) {
+      fail("Table already exists: " + tableName);
+    }
+  }
+
+  private void deleteTable(DynamoDB db, String tableName) throws
+      InterruptedException {
+    Table table = db.getTable(tableName);
+    table.waitForActive();
+    table.delete();
+    table.waitForDelete();
+  }
+
+  @Test
+  public void testConcurrentTableCreations() throws Exception {
+    Configuration conf = getConfiguration();
+    Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard",
+        conf.get(Constants.S3_METADATA_STORE_IMPL).equals(
+            Constants.S3GUARD_METASTORE_DYNAMO));
+
+    DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+    ms.initialize(conf);
+    DynamoDB db = ms.getDynamoDB();
+
+    String tableName = "testConcurrentTableCreations" + new Random().nextInt();
+    conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+    conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+    int concurrentOps = 16;
+    int iterations = 4;
+
+    failIfTableExists(db, tableName);
+
+    for (int i = 0; i < iterations; i++) {
+      ExecutorService executor = Executors.newFixedThreadPool(
+          concurrentOps, new ThreadFactory() {
+            private AtomicInteger count = new AtomicInteger(0);
+
+            public Thread newThread(Runnable r) {
+              return new Thread(r,
+                  "testConcurrentTableCreations" + count.getAndIncrement());
+            }
+          });
+      ((ThreadPoolExecutor) executor).prestartAllCoreThreads();
+      Future<Boolean>[] futures = new Future[concurrentOps];
+      int exceptionsThrown = 0;
+      for (int f = 0; f < concurrentOps; f++) {
+        final int index = f;
+        futures[f] = executor.submit(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+            boolean result = false;
+            try {
+              new DynamoDBMetadataStore().initialize(conf);
+            } catch (Exception e) {
+              LOG.error(e.getClass() + ": " + e.getMessage());
+              result = true;
+            }
+
+            timer.end("parallel DynamoDB client creation %d", index);
+            LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
+                index, timer.getStartTime(), timer.getEndTime());
+            return result;
+          }
+        });
+      }
+      for (int f = 0; f < concurrentOps; f++) {
+        if (futures[f].get()) {
+          exceptionsThrown++;
+        }
+      }
+      deleteTable(db, tableName);
+      if (exceptionsThrown > 0) {
+        fail(exceptionsThrown + "/" + concurrentOps +
+            " threads threw exceptions while initializing on iteration " + i);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message