helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-466] Speed up zkcopy by using asyn read/write, rb=23241
Date Wed, 02 Jul 2014 22:01:11 GMT
Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 79a6b92aa -> 450d9fea9


[HELIX-466] Speed up zkcopy by using asyn read/write, rb=23241


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/450d9fea
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/450d9fea
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/450d9fea

Branch: refs/heads/helix-0.6.2-release
Commit: 450d9fea902b1b28806038d068d6b7ce1d2be3e1
Parents: 79a6b92
Author: zzhang <zzhang5@uci.edu>
Authored: Wed Jul 2 14:58:10 2014 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Wed Jul 2 14:59:44 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/tools/ZkCopy.java     | 110 ++++++++++++++-----
 1 file changed, 81 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/450d9fea/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
index 3991c09..dc6d405 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ZkCopy.java
@@ -20,10 +20,11 @@ package org.apache.helix.tools;
  */
 
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
@@ -32,7 +33,10 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.manager.zk.ByteArraySerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.common.PathUtils;
@@ -86,57 +90,105 @@ public class ZkCopy {
     }
   }
 
-  private static void zkCopy(ZkClient srcClient, String srcPath, ZkClient dstClient, String
dstPath) {
+  /**
+   * Copy a list of paths from src to dst
+   * @param srcClient
+   * @param srcRootPath
+   * @param dstClient
+   * @param dstRootPath
+   * @param paths
+   */
+  private static void copy(ZkClient srcClient, String srcRootPath, ZkClient dstClient,
+      String dstRootPath, List<String> paths) {
+    BaseDataAccessor<Object> srcAccessor = new ZkBaseDataAccessor<Object>(srcClient);
+    List<String> readPaths = new ArrayList<String>();
+    for (String path : paths) {
+      readPaths.add(concatenate(srcRootPath, path));
+    }
+    List<Stat> stats = new ArrayList<Stat>();
+    List<Object> readData = srcAccessor.get(readPaths, stats, 0);
+
+    List<String> writePaths = new ArrayList<String>();
+    List<Object> writeData = new ArrayList<Object>();
+    for (int i = 0; i < paths.size(); i++) {
+      if (stats.get(i).getEphemeralOwner() != 0) {
+        logger.warn("Skip copying ephemeral znode: " + readPaths.get(i));
+        continue;
+      }
+
+      writePaths.add(concatenate(dstRootPath, paths.get(i)));
+      writeData.add(readData.get(i));
+    }
+
+    if (writePaths.size() > 0) {
+      BaseDataAccessor<Object> dstAccessor = new ZkBaseDataAccessor<Object>(dstClient);
+      boolean[] success =
+          dstAccessor.createChildren(writePaths, writeData, AccessOption.PERSISTENT);
+      List<String> successPaths = new ArrayList<String>();
+      List<String> failPaths = new ArrayList<String>();
+      for (int i = 0; i < success.length; i++) {
+        if (success[i]) {
+          successPaths.add(writePaths.get(i));
+        } else {
+          failPaths.add(writePaths.get(i));
+        }
+      }
+
+      // Print
+      if (!successPaths.isEmpty()) {
+        System.out.println("Copy " + successPaths);
+      }
+
+      if (!failPaths.isEmpty()) {
+        System.out.println("Skip " + failPaths);
+      }
+    }
+  }
+
+  private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient dstClient,
String dstRootPath) {
     // Strip off tailing "/"
-    if (!srcPath.equals("/") && srcPath.endsWith("/")) {
-      srcPath = srcPath.substring(0, srcPath.length() - 1);
+    if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) {
+      srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1);
     }
 
-    if (!dstPath.equals("/") && dstPath.endsWith("/")) {
-      dstPath = dstPath.substring(0, dstPath.length() - 1);
+    if (!dstRootPath.equals("/") && dstRootPath.endsWith("/")) {
+      dstRootPath = dstRootPath.substring(0, dstRootPath.length() - 1);
     }
 
     // Validate paths
-    PathUtils.validatePath(srcPath);
-    PathUtils.validatePath(dstPath);
+    PathUtils.validatePath(srcRootPath);
+    PathUtils.validatePath(dstRootPath);
 
-    if (srcPath.equals(dstPath)) {
+    if (srcRootPath.equals(dstRootPath)) {
       logger.info("srcPath == dstPath. Skip copying");
       return;
     }
 
-    if (srcPath.startsWith(dstPath) || dstPath.startsWith(srcPath)) {
+    if (srcRootPath.startsWith(dstRootPath) || dstRootPath.startsWith(srcRootPath)) {
       throw new IllegalArgumentException(
-          "srcPath/dstPath can't be prefix of dstPath/srcPath, was srcPath: " + srcPath
-              + ", dstPath: " + dstPath);
+          "srcPath/dstPath can't be prefix of dstPath/srcPath, was srcPath: " + srcRootPath
+              + ", dstPath: " + dstRootPath);
     }
 
     // Recursive copy using BFS
     List<String> queue = new LinkedList<String>();
-    queue.add("");
+    String root = "";
+    copy(srcClient, srcRootPath, dstClient, dstRootPath, Arrays.asList(root));
+
+    queue.add(root);
     while (!queue.isEmpty()) {
       String path = queue.remove(0);
-      Stat stat = new Stat();
-      String fromPath = concatenate(srcPath, path);
-      Object data = srcClient.readDataAndStat(fromPath, stat, false);
-      if (stat.getEphemeralOwner() != 0) {
-        logger.warn("Skip copying ephemeral znode: " + fromPath);
-        continue;
-      }
-      String toPath = concatenate(dstPath, path);
-      try {
-        dstClient.createPersistent(toPath, data);
-        System.out.println("Copy " + fromPath + " to " + toPath);
-
-      } catch (ZkNodeExistsException e) {
-        logger.warn("Skip copying znode: " + fromPath + ", " + toPath + " already exists");
-      }
+      String fromPath = concatenate(srcRootPath, path);
 
       List<String> children = srcClient.getChildren(fromPath);
+      List<String> paths = new ArrayList<String>();
       if (children != null && children.size() > 0) {
         for (String child : children) {
-          queue.add(concatenate(path, child));
+          String childPath = concatenate(path, child);
+          paths.add(childPath);
         }
+        copy(srcClient, srcRootPath, dstClient, dstRootPath, paths);
+        queue.addAll(paths);
       }
     }
   }


Mime
View raw message