zookeeper-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdzfirst mdzfirst <mdzfi...@gmail.com>
Subject Re: WriteLock does not work.
Date Tue, 17 Dec 2013 00:54:25 GMT
Here is my code. The ZOOKEEPER_HOSTS is defined in Constants class. This
test aims to simulate a resource manager whose major API is lock(). The
natural way is to acquire the mutex in lock(), and release it in
revocationRequested(). But since the InterProcessMutex class seems to
require that both locker and unlocker to be the same thread, I have to use
a background thread to fulfil this.

To repeat my experiment, you should open two consoles and lunch this
program. Then input 'lock' alternately in both consoles. In my execution,
one console stops to react to the revoke request any more after a few turns.

Thanks for your patience.

//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

package test;

import java.io.IOException;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.RevocationListener;
import org.apache.curator.framework.recipes.locks.Revoker;
import org.apache.curator.retry.ExponentialBackoffRetry;

import core.conf.Constants;

public class WatcherTest extends Constants implements Runnable,
RevocationListener<InterProcessMutex> {
    RetryPolicy retryPolicy;
    CuratorFramework client;
    InterProcessMutex mutex;
    boolean locked;

    boolean requestClose;
    boolean requestLock;
    boolean requestUnlock;

    public WatcherTest() throws IOException, InterruptedException {
        retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(ZOOKEEPER_HOSTS,
retryPolicy);
        client.start();

        mutex = new InterProcessMutex(client, "/WatcherTest");
        mutex.makeRevocable(this);

        locked = false;

        requestClose = false;
        requestLock = false;
        requestUnlock = false;
    }

    public void lock() throws Exception {
        synchronized (this) {
            while (! locked) {
                requestLock = true;
                this.notifyAll();
                this.wait();
            }

            if (locked)
                System.out.println("Do business here.");
            else
                System.out.println("ERROR");
        }
    }

    public void close() {
        synchronized (this) {
            requestClose = true;
            this.notifyAll();
        }
    }

    @Override
    public void run() {
        try {
            while (true) {
                synchronized (this) {
                    while (! (requestClose || requestLock || requestUnlock))
                        this.wait();
                    if (requestClose) {
                        client.close();
                        return;
                    } else if (requestLock) {
                        if (! locked) {
                            while (! mutex.acquire(1000,
TimeUnit.MILLISECONDS)) {
                                Iterator<String> itr =
mutex.getParticipantNodes().iterator();
                                while (itr.hasNext())
                                    Revoker.attemptRevoke(client,
itr.next());
                                System.out.println("revoke");
                            }
                            System.out.println("Mutex locked.");
                            locked = true;
                        }
                        requestLock = false;
                        this.notifyAll();
                    } else if (requestUnlock) {
                        if (locked) {
                            mutex.release();
                            System.out.println("Mutex released.");
                            locked = false;
                        }
                        requestUnlock = false;
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void revocationRequested(InterProcessMutex useless) {
        System.out.println("Request received");

        synchronized (this) {
            if (locked) {
                requestUnlock = true;
                this.notifyAll();
            }
        }
    }

    public static void main(String[] args) {
        try {
            WatcherTest obj = new WatcherTest();

            Thread t = new Thread(obj);
            t.start();

            Scanner in = new Scanner(System.in);
            while (true) {
                String com = in.next();
                if (com.equals("lock"))
                    obj.lock();
                else if (com.equals("close")) {
                    obj.close();
                    break;
                }
            }

            t.join();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}


2013/12/17 Jordan Zimmerman <jordan@jordanzimmerman.com>

> Well, there are tests for it. Other than that, can you provide a sample
> that doesn't work?
>
> -Jordan
>
> On Dec 16, 2013, at 5:05 PM, mdzfirst mdzfirst <mdzfirst@gmail.com> wrote:
>
> > Hi Jordan,
> >
> > I have tried InterProcessMutex in Curator recipe. Sometimes when a
> process
> > attempts to revoke a locked mutex, the revocationRequested() function of
> > the mutex holder is not called. Any idea why this could happen? Are there
> > any limitation in Curator implementation, such as revocation times or
> > frequency?
> >
> > Thanks and best wishes,
> > mdzfirst
> >
> >
> > 2013/12/15 Jordan Zimmerman <jordan@jordanzimmerman.com>
> >
> >> Are you using a JVM language? Curator already has a revocable lock
> recipe.
> >>
> >>
> >>
> http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/locks/InterProcessMutex.html
> >>
> >> -Jordan
> >>
> >> On Dec 15, 2013, at 8:28 AM, mdzfirst mdzfirst <mdzfirst@gmail.com>
> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> I would like to implement a 'preemptible' mutex with the WriteLock
> >> example
> >>> in the recipe. My method is like this.
> >>>
> >>> 1. Acquire the WriteLock and get the resources
> >>> 2. Set a watch on the lock's root node with getChildren() repeatedly,
> >> using
> >>> a separate ZooKeeper client
> >>> 3. If only one child (self) is found, sleep
> >>> 4. The callback function only wakes up the main thread
> >>> 5. If the watch is triggered and > 1 children are found, release the
> >>> resources and unlock the WriteLock
> >>>
> >>> In my design, I will execute the jar multiple times. Each time a new
> one
> >> is
> >>> started, the old one should be forced to release the mutex. It works.
> The
> >>> first execution does stop. However, the second process seems to lose
> its
> >>> watch on the first one, because the callback function I passed to the
> >>> WriteLock object is never evoked. The callback function is used to
> >>> synchronize the locking request and is thus very important.
> >>>
> >>> Can anyone see any flaws in my design, or provide a different way that
> >>> works? Thanks~
> >>>
> >>> Best regards,
> >>> mdzfirst
> >>
> >>
> >
> >
> > --
> > Best wishes,
> >
> > --------
> >
> > Ma Dongzhe
> > Department of Computer Science and Technology
> > Tsinghua University
> > Beijing 100084, P.R. China
>
>


-- 
Best wishes,

--------

Ma Dongzhe
Department of Computer Science and Technology
Tsinghua University
Beijing 100084, P.R. China

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message