zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sy...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3885: add locking for watchers hashtables
Date Fri, 31 Jul 2020 14:10:17 GMT
This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b776b23  ZOOKEEPER-3885: add locking for watchers hashtables
b776b23 is described below

commit b776b2360ac282fc4eef1e86fcf185d7a6c3eae5
Author: Tudor Bosman <tudor@rockset.com>
AuthorDate: Fri Jul 31 13:42:18 2020 +0000

    ZOOKEEPER-3885: add locking for watchers hashtables
    
    See the comments in the JIRA issue.
    
    Author: Tudor Bosman <tudor@rockset.com>
    
    Reviewers: Mate Szalay-Beko <symat@apache.org>, Damien Diederen <dd@crosstwine.com>,
Enrico Olivelli <eolivelli@apache.org>
    
    Closes #1403 from tudor/htlocking1
---
 .../zookeeper-client-c/src/mt_adaptor.c            | 24 ++++++++++++++++++++--
 .../zookeeper-client-c/src/st_adaptor.c            | 10 +++++++++
 .../zookeeper-client-c/src/zk_adaptor.h            |  5 +++++
 .../zookeeper-client-c/src/zookeeper.c             | 12 +++++++++++
 4 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
index 73b64c3..7292843 100644
--- a/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
+++ b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
@@ -256,12 +256,13 @@ int adaptor_init(zhandle_t *zh)
     pthread_mutex_init(&zh->to_process.lock,0);
     pthread_mutex_init(&adaptor_threads->zh_lock,0);
     pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
-    // to_send must be recursive mutex    
+    pthread_mutex_init(&adaptor_threads->watchers_lock,0);
+    // to_send must be recursive mutex
     pthread_mutexattr_init(&recursive_mx_attr);
     pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
     pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr);
     pthread_mutexattr_destroy(&recursive_mx_attr);
-    
+
     pthread_mutex_init(&zh->sent_requests.lock,0);
     pthread_cond_init(&zh->sent_requests.cond,0);
     pthread_mutex_init(&zh->completions_to_process.lock,0);
@@ -531,6 +532,25 @@ int unlock_reconfig(struct _zhandle *zh)
     }
 }
 
+int lock_watchers(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_lock(&adaptor->watchers_lock);
+    } else {
+        return 0;
+    }
+}
+int unlock_watchers(struct _zhandle *zh)
+{
+    struct adaptor_threads *adaptor = zh->adaptor_priv;
+    if (adaptor) {
+        return pthread_mutex_unlock(&adaptor->watchers_lock);
+    } else {
+        return 0;
+    }
+}
+
 int enter_critical(zhandle_t* zh)
 {
     struct adaptor_threads *adaptor = zh->adaptor_priv;
diff --git a/zookeeper-client/zookeeper-client-c/src/st_adaptor.c b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
index ddc2582..07540b7 100644
--- a/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
+++ b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
@@ -94,6 +94,16 @@ int unlock_reconfig(struct _zhandle *zh)
     return 0;
 }
 
+int lock_watchers(struct _zhandle *zh)
+{
+    return 0;
+}
+
+int unlock_watchers(struct _zhandle *zh)
+{
+    return 0;
+}
+
 int enter_critical(zhandle_t* zh)
 {
     return 0;
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
index 46a7e7e..77d70f3 100644
--- a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
+++ b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
@@ -167,6 +167,7 @@ struct adaptor_threads {
      pthread_mutex_t lock;          // ... and a lock
      pthread_mutex_t zh_lock;       // critical section lock
      pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
+     pthread_mutex_t watchers_lock; // lock for watcher operations
 #ifdef WIN32
      SOCKET self_pipe[2];
 #else
@@ -300,6 +301,10 @@ int zoo_unlock_auth(zhandle_t *zh);
 int lock_reconfig(struct _zhandle *zh);
 int unlock_reconfig(struct _zhandle *zh);
 
+// watchers hashtable lock
+int lock_watchers(struct _zhandle *zh);
+int unlock_watchers(struct _zhandle *zh);
+
 // critical section guards
 int enter_critical(zhandle_t* zh);
 int leave_critical(zhandle_t* zh);
diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
index d33b50c..8497055 100644
--- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c
+++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
@@ -2099,9 +2099,11 @@ static int send_set_watches(zhandle_t *zh)
     int rc;
 
     req.relativeZxid = zh->last_zxid;
+    lock_watchers(zh);
     req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
     req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
     req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
+    unlock_watchers(zh);
 
     // return if there are no pending watches
     if (!req.dataWatches.count && !req.existWatches.count &&
@@ -3055,7 +3057,9 @@ static int queue_session_event(zhandle_t *zh, int state)
     }
     /* We queued the buffer, so don't free it */
     close_buffer_oarchive(&oa, 0);
+    lock_watchers(zh);
     cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
+    unlock_watchers(zh);
     queue_completion(&zh->completions_to_process, cptr, 0);
     if (process_async(zh->outstanding_sync)) {
         process_completions(zh);
@@ -3361,7 +3365,9 @@ int zookeeper_process(zhandle_t *zh, int events)
             /* We are doing a notification, so there is no pending request */
             c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
             c->buffer = bptr;
+            lock_watchers(zh);
             c->c.watcher_result = collectWatchers(zh, type, path);
+            unlock_watchers(zh);
 
             // We cannot free until now, otherwise path will become invalid
             deallocate_WatcherEvent(&evt);
@@ -3416,8 +3422,10 @@ int zookeeper_process(zhandle_t *zh, int events)
                 // Update last_zxid only when it is a request response
                 zh->last_zxid = hdr.zxid;
             }
+            lock_watchers(zh);
             activateWatcher(zh, cptr->watcher, rc);
             deactivateWatcher(zh, cptr->watcher_deregistration, rc);
+            unlock_watchers(zh);
 
             if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
                 LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
@@ -4708,19 +4716,23 @@ static int aremove_watches(
         goto done;
     }
 
+    lock_watchers(zh);
     if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
         rc = ZNOWATCHER;
+        unlock_watchers(zh);
         goto done;
     }
 
     if (local) {
         removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
+        unlock_watchers(zh);
 #ifdef THREADED
         notify_sync_completion((struct sync_completion *)data);
 #endif
         rc = ZOK;
         goto done;
     }
+    unlock_watchers(zh);
 
     oa = create_buffer_oarchive();
     rc = serialize_RequestHeader(oa, "header", &h);


Mime
View raw message