helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-210] Add support to set data with expect version in BaseDataAccessor, rb=13581
Date Fri, 16 Aug 2013 20:53:47 GMT
Updated Branches:
  refs/heads/master 13d19e914 -> 4a46ae055


[HELIX-210] Add support to set data with expect version in BaseDataAccessor, rb=13581


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/4a46ae05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/4a46ae05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/4a46ae05

Branch: refs/heads/master
Commit: 4a46ae055a877737c8eeda63e7c0917d48dc9d2b
Parents: 13d19e9
Author: zzhang <zzhang5@uci.edu>
Authored: Fri Aug 16 13:53:40 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Fri Aug 16 13:53:40 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/BaseDataAccessor.java |  43 +-
 .../helix/manager/zk/HelixGroupCommit.java      |   8 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    | 173 +++++---
 .../manager/zk/ZkCacheBaseDataAccessor.java     |  38 +-
 .../src/test/java/org/apache/helix/Mocks.java   |   7 +
 .../manager/zk/TestZkBaseDataAccessor.java      | 413 ++++++++++++++-----
 6 files changed, 494 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index 7c65460..9154724 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -36,17 +36,18 @@ public interface BaseDataAccessor<T>
   /**
    * This will always attempt to create the znode, if it exists it will return false. Will
    * create parents if they do not exist. For performance reasons, it may try to create
-   * child first and only if it fails it will try to create parent
+   * child first and only if it fails it will try to create parents
    * 
    * @param path path to the ZNode to create
    * @param record the data to write to the ZNode
+   * @param options Set the type of ZNode see the valid values in {@link AccessOption}
    * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists)
    */
   boolean create(String path, T record, int options);
 
   /**
-   * This will always attempt to set the data on existing node. If the znode does not
-   * exist it will create it.
+   * This will always attempt to set the data on existing node. If the ZNode does not
+   * exist it will create it and all its parents ZNodes if necessary
    * 
    * @param path path to the ZNode to set
    * @param record the data to write to the ZNode
@@ -56,18 +57,31 @@ public interface BaseDataAccessor<T>
   boolean set(String path, T record, int options);
 
   /**
-   * This will attempt to merge with existing data by calling znrecord.merge and if it
-   * does not exist it will create it znode
+   * This will attempt to set the data on existing node only if version matches.
+   * If the ZNode does not exist it will create it and all its parent ZNodes only if expected
version is -1
+   * 
+   * @param path path to the ZNode to set
+   * @param record the data to write to the ZNode
+   * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+   * @param expectVersion the expected version of the data to be overwritten, -1 means match
any version
+   * @return true if data was successfully set, false otherwise (e.g. if the version mismatches)
+   */
+  boolean set(String path, T record, int expectVersion, int options);
+  
+  /**
+   * This will attempt to update the data using the updater. If the ZNode
+   * does not exist it will create it and all its parent ZNodes.
+   * Updater will be invoked with null value if node does not exist.
    * 
    * @param path path to the ZNode to update
    * @param updater an update routine for the data to merge in
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
-   * @return true if data merge succeeded, false otherwise
+   * @return true if data update succeeded, false otherwise
    */
   boolean update(String path, DataUpdater<T> updater, int options);
 
   /**
-   * This will remove znode and all its child nodes if any
+   * This will remove the ZNode and all its descendants if any
    * 
    * @param path path to the root ZNode to remove
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
@@ -79,8 +93,8 @@ public interface BaseDataAccessor<T>
    * Use it when creating children under a parent node. This will use async api for better
    * performance. If the child already exists it will return false.
    * 
-   * @param parentPath paths to the immediate parent ZNodes
-   * @param record List of data to write to each of the children
+   * @param paths the paths to the children ZNodes
+   * @param record List of data to write to each of the path
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
    * @return For each child: true if creation succeeded, false otherwise (e.g. if the child
exists)
    */
@@ -90,7 +104,7 @@ public interface BaseDataAccessor<T>
    * can set multiple children under a parent node. This will use async api for better
    * performance. If this child does not exist it will create it.
    * 
-   * @param parentPath paths to the immediate parent ZNodes
+   * @param paths the paths to the children ZNodes
    * @param record List of data with which to overwrite the corresponding ZNodes
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
    * @return For each child: true if the data was set, false otherwise
@@ -101,10 +115,10 @@ public interface BaseDataAccessor<T>
    * Can update multiple nodes using async api for better performance. If a child does not
    * exist it will create it.
    * 
-   * @param parentPath paths to the immediate parent ZNodes
-   * @param updaters List of update routines for records to merge in
+   * @param the paths to the children ZNodes
+   * @param updaters List of update routines for records to update
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
-   * @return For each child, true if the data was merged in, false otherwise
+   * @return For each child, true if the data is updated successfully, false otherwise
    */
   boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters,
int options);
 
@@ -121,6 +135,7 @@ public interface BaseDataAccessor<T>
    * Get the {@link T} corresponding to the path
    * 
    * @param path path to the ZNode
+   * @param stat retrieve the stat of the ZNode
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
    * @return the record data stored at the ZNode
    */
@@ -130,6 +145,7 @@ public interface BaseDataAccessor<T>
    * Get List of {@link T} corresponding to the paths using async api
    * 
    * @param paths paths to the ZNodes
+   * @param stats retrieve a list of stats for the ZNodes
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
    * @return List of record data stored at each ZNode
    */
@@ -224,6 +240,7 @@ public interface BaseDataAccessor<T>
   void unsubscribeChildChanges(String path, IZkChildListener listener);
 
   /**
+   * TODO refactor this. reset() should not be in data accessor
    * reset the cache if any, when session expiry happens
    */
   void reset();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
index 6de5d82..e95c2bf 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.AccessOption;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
@@ -108,6 +109,9 @@ public class HelixGroupCommit<T>
               T merged = null;
 
               Stat readStat = new Stat();
+              
+              // to create a new znode, we need set version to -1
+              readStat.setVersion(-1);
               try
               {
                 // accessor will fallback to zk if not found in cache
@@ -115,7 +119,7 @@ public class HelixGroupCommit<T>
               }
               catch (ZkNoNodeException e)
               {
-                // OK.
+                // OK
               }
 
               // updater should handler merged == null
@@ -149,7 +153,7 @@ public class HelixGroupCommit<T>
                 it.remove();
               }
               // System.out.println("size:"+ processed.size());
-              accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
+              accessor.set(mergedKey, merged, readStat.getVersion(), options);
             }
             catch (ZkBadVersionException e)
             {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 77e2b5a..6f4e65c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -56,6 +55,31 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
   {
     OK, NODE_EXISTS, ERROR
   }
+  
+  /**
+   * struct holding return information
+   *
+   */
+  public class AccessResult
+  {
+    RetCode _retCode;
+    List<String> _pathCreated;
+    
+    Stat _stat;
+    
+    /**
+     * used by update only
+     */
+    T _updatedValue;
+    
+    public AccessResult()
+    {
+      _retCode = RetCode.ERROR;
+      _pathCreated = new ArrayList<String>();
+      _stat = new Stat();
+      _updatedValue = null;
+    }
+  }
 
   private static Logger  LOG = Logger.getLogger(ZkBaseDataAccessor.class);
 
@@ -72,19 +96,22 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
   @Override
   public boolean create(String path, T record, int options)
   {
-    return create(path, record, null, options) == RetCode.OK;
+    AccessResult result =  doCreate(path, record, options);
+    return result._retCode == RetCode.OK;
   }
 
   /**
    * sync create
    */
-  public RetCode create(String path, T record, List<String> pathCreated, int options)
+  public AccessResult doCreate(String path, T record, int options)
   {
+    AccessResult result = new AccessResult();
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null)
     {
       LOG.error("Invalid create mode. options: " + options);
-      return RetCode.ERROR;
+      result._retCode = RetCode.ERROR;
+      return result;
     }
 
     boolean retry;
@@ -94,10 +121,10 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
       try
       {
         _zkClient.create(path, record, mode);
-        if (pathCreated != null)
-          pathCreated.add(path);
+        result._pathCreated.add(path);
 
-        return RetCode.OK;
+        result._retCode = RetCode.OK;
+        return result;
       }
       catch (ZkNoNodeException e)
       {
@@ -105,7 +132,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
         String parentPath = HelixUtil.getZkParentPath(path);
         try
         {
-          RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
+          AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+          result._pathCreated.addAll(res._pathCreated);
+          RetCode rc = res._retCode;
           if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
           {
             // if parent node created/exists, retry
@@ -115,23 +144,27 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
         catch (Exception e1)
         {
           LOG.error("Exception while creating path: " + parentPath, e1);
-          return RetCode.ERROR;
+          result._retCode = RetCode.ERROR;
+          return result;
         }
       }
       catch (ZkNodeExistsException e)
       {
         LOG.warn("Node already exists. path: " + path);
-        return RetCode.NODE_EXISTS;
+        result._retCode = RetCode.NODE_EXISTS;
+        return result;
       }
       catch (Exception e)
       {
         LOG.error("Exception while creating path: " + path, e);
-        return RetCode.ERROR;
+        result._retCode = RetCode.ERROR;
+        return result;
       }
     }
     while (retry);
 
-    return RetCode.OK;
+    result._retCode = RetCode.OK;
+    return result;
   }
 
   /**
@@ -140,27 +173,43 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
   @Override
   public boolean set(String path, T record, int options)
   {
-    return set(path, record, null, null, -1, options);
+    return set(path, record, -1, options);
   }
 
   /**
    * sync set
+   */
+  @Override
+  public boolean set(String path, T record, int expectVersion, int options)
+  {
+    try 
+    {
+      AccessResult result = doSet(path, record, expectVersion, options);
+      return result._retCode == RetCode.OK;
+    } 
+    catch (ZkBadVersionException e)
+    {
+      return false;
+    }
+  }
+  
+  /**
+   * sync set
    * 
-   * @param setstat
-   *          : if node is created instead of set, stat will NOT be set
    */
-  public boolean set(String path,
-                     T record,
-                     List<String> pathsCreated,
-                     Stat setstat,
-                     int expectVersion,
-                     int options)
+  public AccessResult doSet(String path,
+                            T record,
+                            int expectVersion,
+                            int options)
   {
+    AccessResult result = new AccessResult();
+
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null)
     {
       LOG.error("Invalid set mode. options: " + options);
-      return false;
+      result._retCode = RetCode.ERROR;
+      return result;
     }
 
     boolean retry;
@@ -169,36 +218,43 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
       retry = false;
       try
       {
-        // _zkClient.writeData(path, record);
-        Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
-        if (setstat != null)
-          DataTree.copyStat(setStat, setstat);
+        Stat stat = _zkClient.writeDataGetStat(path, record, expectVersion);
+        DataTree.copyStat(stat, result._stat);
       }
       catch (ZkNoNodeException e)
       {
-        // node not exists, try create. in this case, stat will not be set
+        // node not exists, try create if expectedVersion == -1; in this case, stat will
not be set
+        if (expectVersion != -1) 
+        {
+          LOG.error("Could not create node if expectVersion != -1, was " + expectVersion);
+          result._retCode = RetCode.ERROR;
+          return result;
+        }
         try
         {
-          RetCode rc = create(path, record, pathsCreated, options);
-          // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
-          // retry = true;
+          // may create recursively
+          AccessResult res = doCreate(path, record, options);
+          result._pathCreated.addAll(res._pathCreated);
+          RetCode rc = res._retCode;
           switch (rc)
           {
           case OK:
             // not set stat if node is created (instead of set)
             break;
-          case NODE_EXISTS:
+          case NODE_EXISTS:            
             retry = true;
             break;
           default:
             LOG.error("Fail to set path by creating: " + path);
-            return false;
+            result._retCode = RetCode.ERROR;
+            return result;
           }
         }
         catch (Exception e1)
         {
           LOG.error("Exception while setting path by creating: " + path, e);
-          return false;
+          result._retCode = RetCode.ERROR;
+          return result;
         }
       }
       catch (ZkBadVersionException e)
@@ -208,12 +264,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
       catch (Exception e)
       {
         LOG.error("Exception while setting path: " + path, e);
-        return false;
+        result._retCode = RetCode.ERROR;
+        return result;
       }
     }
     while (retry);
 
-    return true;
+    result._retCode = RetCode.OK;
+    return result;
   }
 
   /**
@@ -222,25 +280,25 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
   @Override
   public boolean update(String path, DataUpdater<T> updater, int options)
   {
-    return update(path, updater, null, null, options) != null;
+    AccessResult result = doUpdate(path, updater, options);
+    return result._retCode == RetCode.OK;
   }
 
   /**
    * sync update
    * 
-   * @return: updatedData on success, or null on fail
    */
-  public T update(String path,
-                  DataUpdater<T> updater,
-                  List<String> createPaths,
-                  Stat stat,
-                  int options)
+  public AccessResult doUpdate(String path,
+                               DataUpdater<T> updater,
+                               int options)
   {
+    AccessResult result = new AccessResult();
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null)
     {
       LOG.error("Invalid update mode. options: " + options);
-      return null;
+      result._retCode = RetCode.ERROR;
+      return result;
     }
 
     boolean retry;
@@ -254,10 +312,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
         T oldData = (T) _zkClient.readData(path, readStat);
         T newData = updater.update(oldData);
         Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
-        if (stat != null)
-        {
-          DataTree.copyStat(setStat, stat);
-        }
+        DataTree.copyStat(setStat, result._stat);
 
         updatedData = newData;
       }
@@ -267,11 +322,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
       }
       catch (ZkNoNodeException e)
       {
-        // node not exist, try create
+        // node not exist, try create, pass null to updater
         try
         {
           T newData = updater.update(null);
-          RetCode rc = create(path, newData, createPaths, options);
+          AccessResult res = doCreate(path, newData, options);
+          result._pathCreated.addAll(res._pathCreated);
+          RetCode rc = res._retCode;
           switch (rc)
           {
           case OK:
@@ -282,24 +339,29 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
             break;
           default:
             LOG.error("Fail to update path by creating: " + path);
-            return null;
+            result._retCode = RetCode.ERROR;
+            return result;
           }
         }
         catch (Exception e1)
         {
           LOG.error("Exception while updating path by creating: " + path, e1);
-          return null;
+          result._retCode = RetCode.ERROR;
+          return result;
         }
       }
       catch (Exception e)
       {
         LOG.error("Exception while updating path: " + path, e);
-        return null;
+        result._retCode = RetCode.ERROR;
+        return result;
       }
     }
     while (retry);
 
-    return updatedData;
+    result._retCode = RetCode.OK;
+    result._updatedValue = updatedData;
+    return result;
   }
 
   /**
@@ -516,13 +578,12 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
     try
     {
       // optimize on common path
-      _zkClient.delete(path);
+      return _zkClient.delete(path);
     }
     catch (ZkException e)
     {
-      _zkClient.deleteRecursive(path);
+      return _zkClient.deleteRecursive(path);
     }
-    return true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 869871c..e9e0587 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -276,11 +276,10 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
       try
       {
         cache.lockWrite();
-        List<String> pathsCreated = new ArrayList<String>();
-        RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
-        boolean success = (rc == RetCode.OK);
+        ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doCreate(serverPath,
data, options);
+        boolean success = (result._retCode == RetCode.OK);
 
-        updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
+        updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
 
         return success;
       }
@@ -297,6 +296,12 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
   @Override
   public boolean set(String path, T data, int options)
   {
+    return set(path, data, -1, options);
+  }
+  
+  @Override
+  public boolean set(String path, T data, int expectVersion, int options)
+  {
     String clientPath = path;
     String serverPath = prependChroot(clientPath);
 
@@ -306,15 +311,17 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
       try
       {
         cache.lockWrite();
-        Stat setStat = new Stat();
-        List<String> pathsCreated = new ArrayList<String>();
-        boolean success =
-            _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
+        ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doSet(serverPath,
data, expectVersion, options);
+        boolean success = result._retCode == RetCode.OK;
 
-        updateCache(cache, pathsCreated, success, serverPath, data, setStat);
+        updateCache(cache, result._pathCreated, success, serverPath, data, result._stat);
 
         return success;
       }
+      catch (Exception e) 
+      {
+        return false;
+      }
       finally
       {
         cache.unlockWrite();
@@ -322,9 +329,9 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
     }
 
     // no cache
-    return _baseAccessor.set(serverPath, data, options);
+    return _baseAccessor.set(serverPath, data, expectVersion, options);
   }
-
+  
   @Override
   public boolean update(String path, DataUpdater<T> updater, int options)
   {
@@ -338,12 +345,9 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
       try
       {
         cache.lockWrite();
-        Stat setStat = new Stat();
-        List<String> pathsCreated = new ArrayList<String>();
-        T updateData =
-            _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
-        boolean success = (updateData != null);
-        updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
+        ZkBaseDataAccessor<T>.AccessResult result = _baseAccessor.doUpdate(serverPath,
updater, options);
+        boolean success = (result._retCode == RetCode.OK);
+        updateCache(cache, result._pathCreated, success, serverPath, result._updatedValue,
result._stat);
 
         return success;
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index e30c09b..d688561 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -195,6 +195,13 @@ public class Mocks {
       
     }
 
+    @Override
+    public boolean set(String path, ZNRecord record, int options, int expectVersion)
+    {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
 //		@Override
 //		public boolean subscribe(String path, IZkListener listener) {
 //			// TODO Auto-generated method stub

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/4a46ae05/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index f97e018..b120105 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -28,11 +28,14 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordUpdater;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor.AccessResult;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -41,128 +44,338 @@ import org.testng.annotations.Test;
 
 public class TestZkBaseDataAccessor extends ZkUnitTestBase
 {
+
   @Test
-  public void testSyncZkBaseDataAccessor()
+  public void testSyncSet()
   {
-    System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
 
-    String root = "TestZkBaseDataAccessor_syn";
-    ZkClient zkClient = new ZkClient(ZK_ADDR);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-    zkClient.deleteRecursive("/" + root);
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    boolean success = accessor.set(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
 
-    BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+  }
+  
+  @Test
+  public void testSyncSetWithVersion()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
 
-    // test sync create
-    for (int i = 0; i < 10; i++)
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    // set persistent
+    boolean success = accessor.set(path, record, 0, AccessOption.PERSISTENT);
+    Assert.assertFalse(success, "Should fail since version not match");
+    try {
+      _gZkClient.readData(path, false);
+      Assert.fail("Should get no node exception");
+    } catch (Exception e)
     {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
-      Assert.assertTrue(success, "Should succeed in create");
+      // OK
     }
-
-    // test get what we created
-    for (int i = 0; i < 10; i++)
+    
+    success = accessor.set(path, record, -1, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+    
+    // set ephemeral
+    path = String.format("/%s/%s", testName, "msg_1");
+    record = new ZNRecord("msg_1");
+    success = accessor.set(path, record, 0, AccessOption.EPHEMERAL);
+    Assert.assertFalse(success);
+    try {
+      _gZkClient.readData(path, false);
+      Assert.fail("Should get no node exception");
+    } catch (Exception e)
     {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord record = zkClient.readData(path);
-      Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+      // OK
     }
+    
+    success = accessor.set(path, record, -1, AccessOption.EPHEMERAL);
+    Assert.assertTrue(success);
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_1");
+    
+    
+    record.setSimpleField("key0", "value0");
+    success = accessor.set(path, record, 0, AccessOption.PERSISTENT);
+    Assert.assertTrue(success, "Should pass. AccessOption.PERSISTENT is ignored");
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 1);
+    Assert.assertNotNull(getRecord.getSimpleField("key0"));
+    Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncDoSet()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
 
-    // test sync set
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord newRecord = new ZNRecord(msgId);
-      newRecord.setSimpleField("key1", "value1");
-      boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
-      Assert.assertTrue(success, "Should succeed in set");
-    }
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s/%s", testName, "msg_0", "submsg_0");
+    ZNRecord record = new ZNRecord("submsg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
 
-    // test get what we set
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord record = zkClient.readData(path);
-      Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field
set");
-      Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
-    }
+    AccessResult result = accessor.doSet(path, record, -1, AccessOption.PERSISTENT);
+    Assert.assertEquals(result._retCode, RetCode.OK);
+    Assert.assertEquals(result._pathCreated.size(), 3);
+    Assert.assertTrue(result._pathCreated.contains(String.format("/%s", testName)));
+    Assert.assertTrue(result._pathCreated.contains(String.format("/%s/%s", testName, "msg_0")));
+    Assert.assertTrue(result._pathCreated.contains(path));
     
-    // test sync update
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord newRecord = new ZNRecord(msgId);
-      newRecord.setSimpleField("key2", "value2");
-      boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
-      Assert.assertTrue(success, "Should succeed in update");
-    }
+    Assert.assertTrue(_gZkClient.exists(String.format("/%s", testName)));
+    Assert.assertTrue(_gZkClient.exists(String.format("/%s/%s", testName, "msg_0")));
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "submsg_0");
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncCreate()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
 
-    // test get what we updated
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord record = zkClient.readData(path);
-      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields
set");
-      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
-    }
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
     
-    // test sync get
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      ZNRecord record = accessor.get(path, null, 0);
-      Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields
set");
-      Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
-      Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
-    }
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    boolean success = accessor.create(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
 
-    // test sync exist
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      boolean exists = accessor.exists(path, 0);
-      Assert.assertTrue(exists, "Should exist");
-    }
+    record.setSimpleField("key0", "value0");
+    success = accessor.create(path, record, AccessOption.PERSISTENT);
+    Assert.assertFalse(success, "Should fail since node already exists");
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncUpdate()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
 
-    // test getStat()
-    for (int i = 0; i < 10; i++)
-    {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      Stat stat = accessor.getStat(path, 0);
-      Assert.assertNotNull(stat, "Stat should exist");
-      Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1
and update 1");
-    }
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    boolean success = accessor.update(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
 
-    // test sync remove
-    for (int i = 0; i < 10; i++)
+    record.setSimpleField("key0", "value0");
+    success = accessor.update(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 1);
+    Assert.assertNotNull(getRecord.getSimpleField("key0"));
+    Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
+    
+    // test throw exception from updater
+    success = accessor.update(path, new DataUpdater<ZNRecord>()
     {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      boolean success = accessor.remove(path, 0);
-      Assert.assertTrue(success, "Should remove");
-    }
+
+      @Override
+      public ZNRecord update(ZNRecord currentData)
+      {
+        throw new RuntimeException("IGNORABLE: test throw exception from updater");
+      }
+    }, AccessOption.PERSISTENT);
+    Assert.assertFalse(success);
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 1);
     
-    // test get what we removed
-    for (int i = 0; i < 10; i++)
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncRemove()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    boolean success = accessor.remove(path, 0);
+    Assert.assertFalse(success);
+    
+    success = accessor.create(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+
+    success = accessor.remove(path, 0);
+    Assert.assertTrue(success);
+    Assert.assertFalse(_gZkClient.exists(path));
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncGet()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    Stat stat = new Stat();
+    ZNRecord getRecord = accessor.get(path, stat, 0);
+    Assert.assertNull(getRecord);
+    
+    try {
+      accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      Assert.fail("Should throw exception if not exist");
+    } catch (Exception e)
     {
-      String msgId = "msg_" + i;
-      String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
-      boolean exists = zkClient.exists(path);
-      Assert.assertFalse(exists, "Should be removed");
+      // OK
     }
+    
+    boolean success = accessor.create(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+
+    getRecord = accessor.get(path, stat, 0);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+    Assert.assertEquals(stat.getVersion(), 0);
+    
+    record.setSimpleField("key0", "value0");
+    success = accessor.set(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    
+    getRecord = accessor.get(path, stat, 0);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(record.getSimpleFields().size(), 1);
+    Assert.assertNotNull(getRecord.getSimpleField("key0"));
+    Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
+    Assert.assertEquals(stat.getVersion(), 1);
+    
+    ZNRecord newRecord = new ZNRecord("msg_0");
+    newRecord.setSimpleField("key1", "value1");
+    success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
+    Assert.assertTrue(success);
+    
+    getRecord = accessor.get(path, stat, 0);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 2);
+    Assert.assertNotNull(getRecord.getSimpleField("key0"));
+    Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
+    Assert.assertNotNull(getRecord.getSimpleField("key1"));
+    Assert.assertEquals(getRecord.getSimpleField("key1"), "value1");
+    Assert.assertEquals(stat.getVersion(), 2);
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+  
+  @Test
+  public void testSyncExist()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    boolean success = accessor.exists(path, 0);
+    Assert.assertFalse(success);
+    
+    success = accessor.create(path, record, AccessOption.EPHEMERAL);
+    Assert.assertTrue(success);
+    
+    success = accessor.exists(path, 0);
+    Assert.assertTrue(success);
+    
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+  
+  @Test
+  public void testSyncGetStat()
+  {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+    
+    String path = String.format("/%s/%s", testName, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    
+    Stat stat = accessor.getStat(path, 0);
+    Assert.assertNull(stat);
+    
+    boolean success = accessor.create(path, record, AccessOption.EPHEMERAL);
+    Assert.assertTrue(success);
+    
+    stat = accessor.getStat(path, 0);
+    Assert.assertNotNull(stat);
+    Assert.assertEquals(stat.getVersion(), 0);
+    Assert.assertNotSame(stat.getEphemeralOwner(), 0);
+   
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
 
-    zkClient.close();
-    System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
   }
   
   @Test


Mime
View raw message