helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-514] ZkBaseDataAccessor#set() should throw BadVersionException instead of return false in case of version mismatch, rb=25453
Date Tue, 09 Sep 2014 01:05:53 GMT
Repository: helix
Updated Branches:
  refs/heads/master 947d7745c -> ef2fac3b9


[HELIX-514] ZkBaseDataAccessor#set() should throw BadVersionException instead of return false
in case of version mismatch, rb=25453


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ef2fac3b
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ef2fac3b
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ef2fac3b

Branch: refs/heads/master
Commit: ef2fac3b939daa38a2f9f8199265fa51538ec1b6
Parents: 947d774
Author: zzhang <zzhang@apache.org>
Authored: Mon Sep 8 18:05:41 2014 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Mon Sep 8 18:05:41 2014 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/HelixGroupCommit.java      | 33 ++++++++++++--------
 .../helix/manager/zk/ZkBaseDataAccessor.java    |  8 ++---
 2 files changed, 22 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/ef2fac3b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
index 48f0647..d51e40b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixGroupCommit.java
@@ -42,10 +42,12 @@ public class HelixGroupCommit<T> {
     final String _key;
     final DataUpdater<T> _updater;
     AtomicBoolean _sent = new AtomicBoolean(false);
+    boolean _isSuccess;
 
     Entry(String key, DataUpdater<T> updater) {
       _key = key;
       _updater = updater;
+      _isSuccess = false;
     }
   }
 
@@ -69,6 +71,7 @@ public class HelixGroupCommit<T> {
 
     queue._pending.add(entry);
 
+    boolean success = false;
     while (!entry._sent.get()) {
       if (queue._running.compareAndSet(null, Thread.currentThread())) {
         ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
@@ -78,10 +81,6 @@ public class HelixGroupCommit<T> {
             return true;
           }
 
-          // remove from queue
-          // Entry first = queue._pending.poll();
-          // processed.add(first);
-
           String mergedKey = first._key;
 
           boolean retry;
@@ -102,9 +101,6 @@ public class HelixGroupCommit<T> {
                 // OK
               }
 
-              // updater should handler merged == null
-              // merged = first._updater.update(merged);
-
               // iterate over processed if we are retrying
               Iterator<Entry<T>> it = processed.iterator();
               while (it.hasNext()) {
@@ -113,7 +109,10 @@ public class HelixGroupCommit<T> {
                   continue;
                 }
                 merged = ent._updater.update(merged);
-                // System.out.println("After merging:" + merged);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("After merging processed entry. path:" + mergedKey + ", value:
"
+                      + merged);
+                }
               }
 
               // iterate over queue._pending for newly coming requests
@@ -125,11 +124,17 @@ public class HelixGroupCommit<T> {
                 }
                 processed.add(ent);
                 merged = ent._updater.update(merged);
-                // System.out.println("After merging:" + merged);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("After merging pending entry. path:" + mergedKey + ", value:
" + merged);
+                }
+
                 it.remove();
               }
-              // System.out.println("size:"+ processed.size());
-              accessor.set(mergedKey, merged, readStat.getVersion(), options);
+              success = accessor.set(mergedKey, merged, readStat.getVersion(), options);
+              if (!success) {
+                LOG.error("Fail to group commit. path: " + mergedKey + ", value: " + merged
+                    + ", version: " + readStat.getVersion());
+              }
             } catch (ZkBadVersionException e) {
               retry = true;
             }
@@ -139,6 +144,7 @@ public class HelixGroupCommit<T> {
           for (Entry<T> e : processed) {
             synchronized (e) {
               e._sent.set(true);
+              e._isSuccess = success;
               e.notify();
             }
           }
@@ -148,12 +154,13 @@ public class HelixGroupCommit<T> {
           try {
             entry.wait(10);
           } catch (InterruptedException e) {
-            e.printStackTrace();
+            LOG.error("Interruped while committing change, key: " + key, e);
+            Thread.currentThread().interrupt();
             return false;
           }
         }
       }
     }
-    return true;
+    return entry._isSuccess;
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/ef2fac3b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index e978a23..8086bd8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -168,12 +168,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
{
    */
   @Override
   public boolean set(String path, T record, int expectVersion, int options) {
-    try {
-      AccessResult result = doSet(path, record, expectVersion, options);
-      return result._retCode == RetCode.OK;
-    } catch (ZkBadVersionException e) {
-      return false;
-    }
+    AccessResult result = doSet(path, record, expectVersion, options);
+    return result._retCode == RetCode.OK;
   }
 
   /**


Mime
View raw message