zookeeper-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrick Hunt <ph...@apache.org>
Subject Re: Watch not sent immediately?
Date Wed, 09 May 2012 20:42:17 GMT
Good point Jordan, I added a jira for this:
https://issues.apache.org/jira/browse/ZOOKEEPER-1464

Patrick

On Wed, May 9, 2012 at 1:27 PM, Jordan Zimmerman <jzimmerman@netflix.com> wrote:
> Interesting - this issue has come up several times with Curator users. I
> ended up writing a Tech Note on it.
>
> https://github.com/Netflix/curator/wiki/Tech-Note-1
>
>
> -JZ
>
> On 5/9/12 1:23 PM, "Patrick Hunt" <phunt@apache.org> wrote:
>
>>I believe the issue is that there is a single thread updating
>>watchers. If you block that thread then the event can't be delivered.
>>
>>Patrick
>>
>>On Fri, May 4, 2012 at 4:06 AM, guru singh <grsingh750@gmail.com> wrote:
>>> Hi,
>>>
>>> Sorry if the subject is not appropriately titled.
>>>
>>> I'm trying to implement a redis-failover solution using zookeeper.
>>> I've been working with the python binding for zk
>>> Basically, I have a znode called /master, a watch is set on this so
>>> that, whenever master changes, self.master is upated
>>> There is another znode called /errors, a watch is set on this via
>>> get_children to errors_watcher function.
>>> My code is supposed to continuously loop and create a childe znode on
>>> /errors, whenever an error is detected.
>>> The function errors_watcher, counts the number of children for znode
>>> /errors, if it exceeds a certain length, it writes a new master
>>> 'ip:port' to the znode /master, this calls the master watcher and
>>> updates self.master. I use python's threading.Condition() to block for
>>> certain operations, for instance initially when znode /master is
>>> created, I wait() for master_watcher to be called which updates
>>> self.master and releases the lock. This works as expected, however the
>>> problem is that when znode /master is changed from within
>>> errors_watcher, if I wait() for master_watcher to be called, updating
>>> self.master and then releasing the lock. The code just keeps waiting,
>>> the master_watcher is never called. However, if I don't wait after
>>> setting znode /master from within errors_watcher, master_watcher is
>>> called and it updates self.master.
>>>
>>> It'll be really helpful if somebody could point out what's wrong? Is
>>> it zk or is my understanding of threading.Condition() incorrect?
>>> Or both :)
>>> Thanks for your help
>>>
>>> This code snippet below, simulates the problem.
>>>
>>> class ZKtest:
>>>
>>>    def __init__(self,zk_server):
>>>        zk.set_log_stream(open('zk.log','w'))
>>>        self.master = None
>>>        self.zk_server = zk_server
>>>        self.connected = False
>>>        self.conn_cv = threading.Condition()
>>>
>>>    def global_watcher(self,handle,event,state,path):
>>>        self.conn_cv.acquire()
>>>        print 'global watcher called'
>>>        self.connected = True
>>>        self.conn_cv.notifyAll()
>>>        self.conn_cv.release()
>>>
>>>    def master_watcher(self,handle,event,state,path):
>>>        self.conn_cv.acquire()
>>>        print 'master watcher called'
>>>        master = zk.get(self.handle,path,self.master_watcher)[0]
>>>        self.master = master
>>>        print 'Master is %s' %(master)
>>>        self.conn_cv.notifyAll()
>>>        self.conn_cv.release()
>>>
>>>    def errors_watcher(self,handle,event,state,path):
>>>        self.conn_cv.acquire()
>>>        print 'error watcher called'
>>>        errors =
>>>len(zk.get_children(self.handle,'/errors',self.errors_watcher))
>>>        print 'Current errors %d' %(errors)
>>>        if errors > 5 :
>>>            print 'Set new master, update znode /master'
>>>            zk.set(self.handle,'/master','127.0.0.1:6380')
>>>            #self.conn_cv.wait() <-- Why doesn't this return??
>>>        self.conn_cv.notifyAll()
>>>        self.conn_cv.release()
>>>
>>>
>>>    def create_znodes(self):
>>>        self.conn_cv.acquire()
>>>        master = zk.exists(self.handle,'/master',self.master_watcher)
>>>        if not master:
>>>            print 'Creating znode /master'
>>>            zk.create(self.handle,'/master','127.0.0.1:6379',
>>>                      [ZOO_OPEN_ACL_UNSAFE])
>>>        else :
>>>            print 'Updating znode /master'
>>>
>>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
>>>        self.conn_cv.wait() # wait until master_watcher has updated
>>> self.master, this returns after master_watcher is called
>>>        print self.master # should not be None, since master_watcher
>>>updates it
>>>        errors = zk.exists(self.handle,'/errors')
>>>        if not errors:
>>>            print 'Creating znode /errors'
>>>            zk.create(self.handle,'/errors','Errors follow',
>>>                      [ZOO_OPEN_ACL_UNSAFE])
>>>        else :
>>>            print 'Purge previous errors'
>>>            for err in zk.get_children(self.handle,'/errors'):
>>>                zk.delete(self.handle,'/errors/'+err)
>>>        err = zk.get_children(self.handle,'/errors',self.errors_watcher)
>>>    # set a watch for children of znode /errors
>>>        self.conn_cv.release()
>>>
>>>
>>>   def run(self):
>>>        self.conn_cv.acquire()
>>>        self.handle = zk.init(self.zk_server,self.global_watcher)
>>>        if not self.connected:
>>>            while not self.connected :
>>>                print 'Not Connected, retry in 5'
>>>                self.conn_cv.wait(5)
>>>                self.handle = zk.init(self.zk_server)
>>>        self.create_znodes()
>>>        while self.master != '127.0.0.1:6380':
>>>            print 'Current Master %s' %(self.master)
>>>            # simulate errors, until master is not 127.0.0.1:6380
>>>
>>>zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
>>>                      zk.SEQUENCE)
>>>            self.conn_cv.wait()
>>>        self.conn_cv.release()
>>>
>>>
>>> if __name__ == '__main__' :
>>>    zkt = ZKtest('127.0.0.1:2181')
>>>    zkt.run()
>>
>

Mime
View raw message