curator-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CURATOR-287) PersistentEphemeralNode should be generalized to accept all create modes
Date Tue, 19 Jan 2016 00:03:39 GMT

    [ https://issues.apache.org/jira/browse/CURATOR-287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15106000#comment-15106000
] 

ASF GitHub Bot commented on CURATOR-287:
----------------------------------------

Github user Randgalt commented on a diff in the pull request:

    https://github.com/apache/curator/pull/123#discussion_r50058605
  
    --- Diff: curator-recipes/src/main/java/org/apache/curator/framework/recipes/nodes/PersistentNode.java
---
    @@ -0,0 +1,382 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.curator.framework.recipes.nodes;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Preconditions;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.framework.api.CreateBuilder;
    +import org.apache.curator.framework.api.CreateModable;
    +import org.apache.curator.framework.api.CuratorEvent;
    +import org.apache.curator.framework.api.CuratorWatcher;
    +import org.apache.curator.framework.state.ConnectionState;
    +import org.apache.curator.framework.state.ConnectionStateListener;
    +import org.apache.curator.utils.PathUtils;
    +import org.apache.zookeeper.CreateMode;
    +import org.apache.zookeeper.KeeperException;
    +import org.apache.zookeeper.WatchedEvent;
    +import org.apache.zookeeper.Watcher.Event.EventType;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * <p>
    + * A persistent node is a node that attempts to stay present in
    + * ZooKeeper, even through connection and session interruptions.
    + * </p>
    + * <p>
    + * Thanks to bbeck (https://github.com/bbeck) for the initial coding and design
    + * </p>
    + */
    +public class PersistentNode implements Closeable
    +{
    +    private final AtomicReference<CountDownLatch> initialCreateLatch = new AtomicReference<CountDownLatch>(new
CountDownLatch(1));
    +    private final Logger log = LoggerFactory.getLogger(getClass());
    +    private final CuratorFramework client;
    +    private final CreateModable<ACLBackgroundPathAndBytesable<String>> createMethod;
    +    private final AtomicReference<String> nodePath = new AtomicReference<String>(null);
    +    private final String basePath;
    +    private final CreateMode mode;
    +    private final AtomicReference<byte[]> data = new AtomicReference<byte[]>();
    +    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    +    private final AtomicBoolean authFailure = new AtomicBoolean(false);
    +    private final BackgroundCallback backgroundCallback;
    +    private final boolean useProtection;
    +    private final CuratorWatcher watcher = new CuratorWatcher()
    +    {
    +        @Override
    +        public void process(WatchedEvent event) throws Exception
    +        {
    +            if ( event.getType() == EventType.NodeDeleted )
    +            {
    +                createNode();
    +            }
    +            else if ( event.getType() == EventType.NodeDataChanged )
    +            {
    +                watchNode();
    +            }
    +        }
    +    };
    +    private final BackgroundCallback checkExistsCallback = new BackgroundCallback()
    +    {
    +        @Override
    +        public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
    +        {
    +            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
    +            {
    +                createNode();
    +            }
    +            else
    +            {
    +                boolean isEphemeral = event.getStat().getEphemeralOwner() != 0;
    +                if ( isEphemeral != mode.isEphemeral() )
    +                {
    +                    log.warn("Existing node ephemeral state doesn't match requested state.
Maybe the node was created outside of PersistentNode? " + basePath);
    +                }
    +            }
    +        }
    +    };
    +    private final BackgroundCallback setDataCallback = new BackgroundCallback()
    +    {
    +
    +        @Override
    +        public void processResult(CuratorFramework client, CuratorEvent event)
    +            throws Exception
    +        {
    +            //If the result is ok then initialisation is complete (if we're still initialising)
    +            //Don't retry on other errors as the only recoverable cases will be connection
loss
    +            //and the node not existing, both of which are already handled by other watches.
    +            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
    +            {
    +                //Update is ok, mark initialisation as complete if required.
    +                initialisationComplete();
    +            }
    +        }
    +    };
    +    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    +    {
    +        @Override
    +        public void stateChanged(CuratorFramework client, ConnectionState newState)
    +        {
    +            if ( newState == ConnectionState.RECONNECTED )
    +            {
    +                createNode();
    +            }
    +        }
    +    };
    +
    +    private enum State
    +    {
    +        LATENT,
    +        STARTED,
    +        CLOSED
    +    }
    +
    +    /**
    +     * @param client        client instance
    +     * @param mode          creation mode
    +     * @param useProtection if true, call {@link CreateBuilder#withProtection()}
    +     * @param basePath the base path for the node
    +     * @param initData data for the node
    +     */
    +    public PersistentNode(CuratorFramework client, final CreateMode mode, boolean useProtection,
final String basePath, byte[] initData)
    +    {
    +        this.useProtection = useProtection;
    +        this.client = Preconditions.checkNotNull(client, "client cannot be null");
    +        this.basePath = PathUtils.validatePath(basePath);
    +        this.mode = Preconditions.checkNotNull(mode, "mode cannot be null");
    +        final byte[] data = Preconditions.checkNotNull(initData, "data cannot be null");
    +
    +        backgroundCallback = new BackgroundCallback()
    +        {
    +            @Override
    +            public void processResult(CuratorFramework client, CuratorEvent event) throws
Exception
    +            {
    +                String path = null;
    +                boolean nodeExists = false;
    +                if ( event.getResultCode() == KeeperException.Code.NODEEXISTS.intValue()
)
    +                {
    +                    path = event.getPath();
    +                    nodeExists = true;
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.OK.intValue()
)
    +                {
    +                    path = event.getName();
    +                }
    +                else if ( event.getResultCode() == KeeperException.Code.NOAUTH.intValue()
)
    +                {
    +                    log.warn("Client does not have authorisation to write ephemeral node
at path {}", event.getPath());
    +                    authFailure.set(true);
    +                    return;
    +                }
    +                if ( path != null )
    +                {
    +                    authFailure.set(false);
    +                    nodePath.set(path);
    +                    watchNode();
    +
    +                    if ( nodeExists )
    +                    {
    +                        client.setData().inBackground(setDataCallback).forPath(getActualPath(),
getData());
    +                    }
    +                    else
    +                    {
    +                        initialisationComplete();
    +                    }
    +                }
    +                else
    +                {
    +                    createNode();
    +                }
    +            }
    +        };
    +
    +        createMethod = useProtection ? client.create().creatingParentContainersIfNeeded().withProtection()
: client.create().creatingParentContainersIfNeeded();
    +        this.data.set(Arrays.copyOf(data, data.length));
    +    }
    +
    +    private void initialisationComplete()
    +    {
    +        CountDownLatch localLatch = initialCreateLatch.getAndSet(null);
    +        if ( localLatch != null )
    +        {
    +            localLatch.countDown();
    +        }
    +    }
    +
    +    /**
    +     * You must call start() to initiate the persistent ephemeral node. An attempt to
create the node
    +     * in the background will be started
    +     */
    +    public void start()
    +    {
    +        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already
started");
    +
    +        client.getConnectionStateListenable().addListener(connectionStateListener);
    +        createNode();
    +    }
    +
    +    /**
    +     * Block until the either initial node creation initiated by {@link #start()} succeeds
or
    +     * the timeout elapses.
    +     *
    +     * @param timeout the maximum time to wait
    +     * @param unit    time unit
    +     * @return if the node was created before timeout
    +     * @throws InterruptedException if the thread is interrupted
    +     */
    +    public boolean waitForInitialCreate(long timeout, TimeUnit unit) throws InterruptedException
    +    {
    +        Preconditions.checkState(state.get() == State.STARTED, "Not started");
    +
    +        CountDownLatch localLatch = initialCreateLatch.get();
    +        return (localLatch == null) || localLatch.await(timeout, unit);
    +    }
    +
    +    @Override
    +    public void close() throws IOException
    +    {
    +        if ( !state.compareAndSet(State.STARTED, State.CLOSED) )
    +        {
    +            return;
    +        }
    +
    +        client.getConnectionStateListenable().removeListener(connectionStateListener);
    +
    +        try
    +        {
    +            deleteNode();
    --- End diff --
    
    I think it makes sense still. The idea is that the node persists until the instance is
closed.


> PersistentEphemeralNode should be generalized to accept all create modes
> ------------------------------------------------------------------------
>
>                 Key: CURATOR-287
>                 URL: https://issues.apache.org/jira/browse/CURATOR-287
>             Project: Apache Curator
>          Issue Type: New Feature
>          Components: Recipes
>    Affects Versions: 3.0.0, 2.9.1
>            Reporter: Jordan Zimmerman
>            Assignee: Jordan Zimmerman
>            Priority: Minor
>
> With very little change, PersistentEphemeralNode could work with non-ephemeral nodes.
There is a good use case for this: permanent nodes that must always exist with some data.
It's actually a pain to do this manually with ZK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message