drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h.@apache.org
Subject drill git commit: DRILL-3171: handle concurrent Zookeeper node creation gracefully
Date Fri, 20 Nov 2015 02:06:44 GMT
Repository: drill
Updated Branches:
  refs/heads/master a6a0fc377 -> f7a0d38ea


DRILL-3171: handle concurrent Zookeeper node creation gracefully


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f7a0d38e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f7a0d38e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f7a0d38e

Branch: refs/heads/master
Commit: f7a0d38eac0024e745c035eb0f606acbf573418f
Parents: a6a0fc3
Author: Hanifi Gunes <hanifigunes@gmail.com>
Authored: Tue Nov 3 16:24:48 2015 -0800
Committer: Hanifi Gunes <hanifigunes@gmail.com>
Committed: Thu Nov 19 17:36:17 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/store/StoragePluginRegistry.java |  52 ++++----
 .../exec/store/sys/zk/ZkAbstractStore.java      | 133 ++++++++++++++-----
 .../drill/exec/store/sys/zk/ZkEStore.java       |  27 +---
 .../exec/store/sys/zk/ZkEStoreProvider.java     |   7 +-
 .../drill/exec/store/sys/zk/ZkPStore.java       |  17 +--
 .../exec/store/sys/zk/ZkPStoreProvider.java     |   3 +-
 6 files changed, 141 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 70e82da..6e11084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -236,41 +236,39 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String,
Storage
   }
 
   public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist)
throws ExecutionSetupException {
-    StoragePlugin oldPlugin = plugins.get(name);
-
-    boolean ok = true;
-    final StoragePlugin newPlugin = create(name, config);
-    try {
-      if (oldPlugin != null) {
-        if (config.isEnabled()) {
-          ok = plugins.replace(name, oldPlugin, newPlugin);
-          if (ok) {
-            closePlugin(oldPlugin);
+    for (;;) {
+      final StoragePlugin oldPlugin = plugins.get(name);
+      final StoragePlugin newPlugin = create(name, config);
+      boolean done = false;
+      try {
+        if (oldPlugin != null) {
+          if (config.isEnabled()) {
+            done = plugins.replace(name, oldPlugin, newPlugin);
+          } else {
+            done = plugins.remove(name, oldPlugin);
           }
-        } else {
-          ok = plugins.remove(name, oldPlugin);
-          if (ok) {
+          if (done) {
             closePlugin(oldPlugin);
           }
+        } else if (config.isEnabled()) {
+          done = (null == plugins.putIfAbsent(name, newPlugin));
+        } else {
+          done = true;
+        }
+      } finally {
+        if (!done) {
+          closePlugin(newPlugin);
         }
-      } else if (config.isEnabled()) {
-        ok = (null == plugins.putIfAbsent(name, newPlugin));
       }
 
-      if (!ok) {
-        throw new ExecutionSetupException("Two processes tried to change a plugin at the
same time.");
-      }
-    } finally {
-      if (!ok) {
-        closePlugin(newPlugin);
-      }
-    }
+      if (done) {
+        if (persist) {
+          pluginSystemTable.put(name, config);
+        }
 
-    if (persist) {
-      pluginSystemTable.put(name, config);
+        return newPlugin;
+      }
     }
-
-    return newPlugin;
   }
 
   public StoragePlugin getPlugin(String name) throws ExecutionSetupException {

http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
index 01059a4..0d2fb38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkAbstractStore.java
@@ -24,29 +24,29 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.apache.drill.exec.rpc.data.DataTunnel;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.zookeeper.CreateMode;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * This is the abstract class that is shared by ZkPStore (Persistent store) and ZkEStore
(Ephemeral Store)
  * @param <V>
  */
 public abstract class ZkAbstractStore<V> implements AutoCloseable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkAbstractStore.class);
 
-  protected CuratorFramework framework;
-  protected PStoreConfig<V> config;
+  protected final CuratorFramework framework;
+  protected final PStoreConfig<V> config;
   private final PathChildrenCache childrenCache;
-  private String prefix;
-  private String parent;
+  private final String prefix;
+  private final String parent;
 
   public ZkAbstractStore(CuratorFramework framework, PStoreConfig<V> config)
       throws IOException {
@@ -54,31 +54,26 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
     this.prefix = parent + "/";
     this.framework = framework;
     this.config = config;
+    this.childrenCache = new PathChildrenCache(framework, parent, true);
 
     // make sure the parent node exists.
+    createOrUpdate(parent, null, CreateMode.PERSISTENT);
     try {
-      if (framework.checkExists().forPath(parent) == null) {
-        framework.create().withMode(CreateMode.PERSISTENT).forPath(parent);
-      }
-
-      this.childrenCache = new PathChildrenCache(framework, parent, true);
-      this.childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
-
+      childrenCache.start(StartMode.BUILD_INITIAL_CACHE);
     } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper for PStore: " + e.getMessage(),
e);
+      throw new RuntimeException("Failure while initializing Zookeeper for PStore", e);
     }
-
   }
 
   public Iterator<Entry<String, V>> iterator() {
     try {
       return new Iter(childrenCache.getCurrentData());
     } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(),
e);
+      throw new RuntimeException("Failure while accessing Zookeeper.", e);
     }
   }
 
-  protected String p(String key) {
+  protected String withPrefix(String key) {
     Preconditions.checkArgument(!key.contains("/"),
         "You cannot use keys that have slashes in them when using the Zookeeper SystemTable
storage interface.");
     return prefix + key;
@@ -86,7 +81,7 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
 
   public V get(String key) {
     try {
-      ChildData d = childrenCache.getCurrentData(p(key));
+      ChildData d = childrenCache.getCurrentData(withPrefix(key));
       if(d == null || d.getData() == null){
         return null;
       }
@@ -100,12 +95,12 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
 
   public void put(String key, V value) {
     try {
-      if (childrenCache.getCurrentData(p(key)) != null) {
-        framework.setData().forPath(p(key), config.getSerializer().serialize(value));
+      if (childrenCache.getCurrentData(withPrefix(key)) != null) {
+        framework.setData().forPath(withPrefix(key), config.getSerializer().serialize(value));
       } else {
-        createNodeInZK(key, value);
+        createWithPrefix(key, value);
       }
-      childrenCache.rebuildNode(p(key));
+      childrenCache.rebuildNode(withPrefix(key));
 
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(),
e);
@@ -114,8 +109,10 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
 
   public void delete(String key) {
     try {
-        framework.delete().forPath(p(key));
-        childrenCache.rebuildNode(p(key));
+      if (framework.checkExists().forPath(withPrefix(key)) != null) {
+        framework.delete().forPath(withPrefix(key));
+        childrenCache.rebuildNode(withPrefix(key));
+      }
     } catch (Exception e) {
       throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(),
e);
     }
@@ -123,11 +120,11 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
 
   public boolean putIfAbsent(String key, V value) {
     try {
-      if (childrenCache.getCurrentData(p(key)) != null) {
+      if (childrenCache.getCurrentData(withPrefix(key)) != null) {
         return false;
       } else {
-        createNodeInZK(key, value);
-        childrenCache.rebuildNode(p(key));
+        createWithPrefix(key, value);
+        childrenCache.rebuildNode(withPrefix(key));
         return true;
       }
 
@@ -136,7 +133,81 @@ public abstract class ZkAbstractStore<V> implements AutoCloseable
{
     }
   }
 
-  public abstract void createNodeInZK (String key, V value);
+  /**
+   * Default {@link CreateMode create mode} that will be used in create operations referred
in the see also section.
+   *
+   * @see #createOrUpdate(String, Object)
+   * @see #createWithPrefix(String, Object)
+   */
+  protected abstract CreateMode getCreateMode();
+
+
+  /**
+   * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and
sets its value if supplied.
+   *
+   * @param path    target path
+   * @param value   value to set, null if none available
+   *
+   * @see #getCreateMode()
+   * @see #createOrUpdate(String, Object)
+   * @see #withPrefix(String)
+   */
+  protected void createWithPrefix(String path, V value) {
+    createOrUpdate(withPrefix(path), value);
+  }
+
+  /**
+   * Creates a node in zookeeper with the {@link #getCreateMode() default create mode} and
sets its value if supplied
+   * or updates its value if the node already exists.
+   *
+   * Note that if node exists, its mode will not be changed.
+   *
+   * @param path    target path
+   * @param value   value to set, null if none available
+   *
+   * @see #getCreateMode()
+   * @see #createOrUpdate(String, Object, CreateMode)
+   */
+  protected void createOrUpdate(String path, V value) {
+    createOrUpdate(path, value, getCreateMode());
+  }
+
+  /**
+   * Creates a node in zookeeper with the given mode and sets its value if supplied or updates
its value if the node
+   * already exists.
+   *
+   * Note that if the node exists, its mode will not be changed.
+   *
+   * Internally, the method suppresses {@link org.apache.zookeeper.KeeperException.NodeExistsException}.
It is
+   * safe to do so since the implementation is idempotent.
+   *
+   * @param path    target path
+   * @param value   value to set, null if none available
+   * @param mode    creation mode
+   * @throws RuntimeException  throws a {@link RuntimeException} wrapping the root cause.
+   */
+  protected void createOrUpdate(String path, V value, CreateMode mode) {
+    try {
+      final boolean isUpdate = value != null;
+      final byte[] valueInBytes = isUpdate ? config.getSerializer().serialize(value) : null;
+      final boolean nodeExists = framework.checkExists().forPath(path) != null;
+      if (!nodeExists) {
+        final ACLBackgroundPathAndBytesable<String> creator = framework.create().withMode(mode);
+        if (isUpdate) {
+          creator.forPath(path, valueInBytes);
+        } else {
+          creator.forPath(path);
+        }
+      } else if (isUpdate) {
+        framework.setData().forPath(path, valueInBytes);
+      }
+    } catch (KeeperException.NodeExistsException ex) {
+      logger.warn("Node already exists in Zookeeper. Skipping... -- [path: {}, mode: {}]",
path, mode);
+    } catch (Exception e) {
+      final String msg = String.format("Failed to create/update Zookeeper node. [path: %s,
mode: %s]", path, mode);
+      throw new RuntimeException(msg, e);
+    }
+  }
 
   private class Iter implements Iterator<Entry<String, V>>{
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
index 1abf3a6..4706287 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStore.java
@@ -17,40 +17,25 @@
  */
 package org.apache.drill.exec.store.sys.zk;
 
+import java.io.IOException;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.zookeeper.CreateMode;
 
-import java.io.IOException;
-
 /**
  * Implementation of EStore using Zookeeper's EPHEMERAL node.
  * @param <V>
  */
-public class ZkEStore<V> extends ZkAbstractStore<V> implements EStore<V>{
+public class ZkEStore<V> extends ZkAbstractStore<V> implements EStore<V>
{
 
-  public ZkEStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException{
+  public ZkEStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException
{
     super(framework,config);
   }
 
   @Override
-  public void delete(String key) {
-    try {
-      if (framework.checkExists().forPath(p(key)) != null) {
-        framework.delete().forPath(p(key));
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper. " + e.getMessage(),
e);
-    }
-  }
-
-  @Override
-  public void createNodeInZK(String key, V value) {
-    try {
-      framework.create().withMode(CreateMode.EPHEMERAL).forPath(p(key), config.getSerializer().serialize(value));
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
+  protected CreateMode getCreateMode() {
+    return CreateMode.EPHEMERAL;
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
index 7d7d475..60277aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkEStoreProvider.java
@@ -18,16 +18,15 @@
 
 package org.apache.drill.exec.store.sys.zk;
 
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.exec.store.sys.EStore;
 import org.apache.drill.exec.store.sys.EStoreProvider;
 import org.apache.drill.exec.store.sys.PStoreConfig;
 import org.apache.drill.exec.store.sys.PStoreConfig.Mode;
 
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-
 public class ZkEStoreProvider implements EStoreProvider{
   private final CuratorFramework curator;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
index 723dbb0..da22996 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java
@@ -28,23 +28,14 @@ import org.apache.zookeeper.CreateMode;
  * Implementation of PStore using Zookeeper's PERSISTENT node.
  * @param <V>
  */
-public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>{
+public class ZkPStore<V> extends ZkAbstractStore<V> implements PStore<V>
{
 
-  ZkPStore(CuratorFramework framework, PStoreConfig<V> config)
-      throws IOException {
+  public ZkPStore(CuratorFramework framework, PStoreConfig<V> config) throws IOException
{
     super(framework, config);
   }
 
   @Override
-  public void createNodeInZK(String key, V value) {
-    try {
-      framework.create().withMode(CreateMode.PERSISTENT).forPath(p(key), config.getSerializer().serialize(value));
-    } catch (Exception e) {
-      throw new RuntimeException("Failure while accessing Zookeeper", e);
-    }
+  protected CreateMode getCreateMode() {
+    return CreateMode.PERSISTENT;
   }
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f7a0d38e/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
index f0fa120..eb5df43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.sys.zk;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -33,8 +34,6 @@ import org.apache.drill.exec.store.sys.PStoreRegistry;
 import org.apache.drill.exec.store.sys.local.FilePStore;
 import org.apache.hadoop.fs.Path;
 
-import com.google.common.annotations.VisibleForTesting;
-
 public class ZkPStoreProvider implements PStoreProvider {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZkPStoreProvider.class);
 


Mime
View raw message