Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EECA918327 for ; Thu, 4 Jun 2015 17:51:16 +0000 (UTC) Received: (qmail 32321 invoked by uid 500); 4 Jun 2015 17:51:16 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 32295 invoked by uid 500); 4 Jun 2015 17:51:16 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 32284 invoked by uid 99); 4 Jun 2015 17:51:16 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Jun 2015 17:51:16 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id A9E80AC0ACD for ; Thu, 4 Jun 2015 17:51:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1683605 - in /zookeeper/trunk: ./ src/ src/docs/src/documentation/content/xdocs/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/serv... Date: Thu, 04 Jun 2015 17:51:16 -0000 To: commits@zookeeper.apache.org From: rgs@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150604175116.A9E80AC0ACD@hades.apache.org> Author: rgs Date: Thu Jun 4 17:51:16 2015 New Revision: 1683605 URL: http://svn.apache.org/r1683605 Log: ZOOKEEPER-2163: Introduce new ZNode type: container (Jordan Zimmerman via rgs) Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ContainerManager.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CreateContainerTest.java Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml zookeeper/trunk/src/java/main/org/apache/zookeeper/CreateMode.java zookeeper/trunk/src/java/main/org/apache/zookeeper/MultiTransactionRecord.java zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CreateModeTest.java zookeeper/trunk/src/zookeeper.jute Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Thu Jun 4 17:51:16 2015 @@ -10,6 +10,8 @@ NEW FEATURES: ZOOKEEPER-2123 Provide implementation of X509 AuthenticationProvider (Ian Dimayuga via rakeshr) + ZOOKEEPER-2163: Introduce new ZNode type: container (Jordan Zimmerman via rgs) + BUGFIXES: ZOOKEEPER-1784 wrong check for COMMITANDACTIVATE in observer code, Learner.java (rgs via shralex). Modified: zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml (original) +++ zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperAdmin.xml Thu Jun 4 17:51:16 2015 @@ -1383,6 +1383,30 @@ server.3=zoo3:2888:3888 + + znode.container.checkIntervalMs + + + (Java system property only) + + New in 3.6.0: The + time interval in milliseconds for each check of candidate container + nodes. Default is "60000". + + + + + znode.container.maxPerMinute + + + (Java system property only) + + New in 3.6.0: The + maximum number of container nodes that can be deleted per + minute. This prevents herding during container deletion. + Default is "10000". + + Modified: zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml (original) +++ zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperProgrammers.xml Thu Jun 4 17:51:16 2015 @@ -243,6 +243,23 @@ overflow when incremented beyond 2147483647 (resulting in a name "<path>-2147483647"). + +
+ Container Nodes + + Added in 3.6.0 + + ZooKeeper has the notion of container nodes. Container nodes are + special purpose nodes useful for recipes such as leader, lock, etc. + When the last child of a container is deleted, the container becomes + a candidate to be deleted by the server at some point in the future. + + Given this property, you should be prepared to get + KeeperException.NoNodeException when creating children inside of + container nodes. i.e. when creating child nodes inside of container nodes + always check for KeeperException.NoNodeException and recreate the container + node when it occurs. +
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/CreateMode.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/CreateMode.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/CreateMode.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/CreateMode.java Thu Jun 4 17:51:16 2015 @@ -19,7 +19,6 @@ package org.apache.zookeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.zookeeper.KeeperException; /*** * CreateMode value determines how the znode is created on ZooKeeper. @@ -29,32 +28,45 @@ public enum CreateMode { /** * The znode will not be automatically deleted upon client's disconnect. */ - PERSISTENT (0, false, false), + PERSISTENT (0, false, false, false), /** * The znode will not be automatically deleted upon client's disconnect, * and its name will be appended with a monotonically increasing number. */ - PERSISTENT_SEQUENTIAL (2, false, true), + PERSISTENT_SEQUENTIAL (2, false, true, false), /** * The znode will be deleted upon the client's disconnect. */ - EPHEMERAL (1, true, false), + EPHEMERAL (1, true, false, false), /** * The znode will be deleted upon the client's disconnect, and its name * will be appended with a monotonically increasing number. */ - EPHEMERAL_SEQUENTIAL (3, true, true); + EPHEMERAL_SEQUENTIAL (3, true, true, false), + /** + * The znode will be a container node. Container + * nodes are special purpose nodes useful for recipes such as leader, lock, + * etc. When the last child of a container is deleted, the container becomes + * a candidate to be deleted by the server at some point in the future. + * Given this property, you should be prepared to get + * {@link org.apache.zookeeper.KeeperException.NoNodeException} + * when creating children inside of this container node. + */ + CONTAINER (4, false, false, true); private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class); private boolean ephemeral; private boolean sequential; + private final boolean isContainer; private int flag; - CreateMode(int flag, boolean ephemeral, boolean sequential) { + CreateMode(int flag, boolean ephemeral, boolean sequential, + boolean isContainer) { this.flag = flag; this.ephemeral = ephemeral; this.sequential = sequential; + this.isContainer = isContainer; } public boolean isEphemeral() { @@ -65,6 +77,10 @@ public enum CreateMode { return sequential; } + public boolean isContainer() { + return isContainer; + } + public int toFlag() { return flag; } @@ -82,6 +98,8 @@ public enum CreateMode { case 3: return CreateMode.EPHEMERAL_SEQUENTIAL ; + case 4: return CreateMode.CONTAINER; + default: String errMsg = "Received an invalid flag value: " + flag + " to convert to a CreateMode"; @@ -89,4 +107,29 @@ public enum CreateMode { throw new KeeperException.BadArgumentsException(errMsg); } } + + /** + * Map an integer value to a CreateMode value + */ + static public CreateMode fromFlag(int flag, CreateMode defaultMode) { + switch(flag) { + case 0: + return CreateMode.PERSISTENT; + + case 1: + return CreateMode.EPHEMERAL; + + case 2: + return CreateMode.PERSISTENT_SEQUENTIAL; + + case 3: + return CreateMode.EPHEMERAL_SEQUENTIAL; + + case 4: + return CreateMode.CONTAINER; + + default: + return defaultMode; + } + } } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/MultiTransactionRecord.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/MultiTransactionRecord.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/MultiTransactionRecord.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/MultiTransactionRecord.java Thu Jun 4 17:51:16 2015 @@ -68,6 +68,7 @@ public class MultiTransactionRecord impl switch (op.getType()) { case ZooDefs.OpCode.create: case ZooDefs.OpCode.create2: + case ZooDefs.OpCode.createContainer: case ZooDefs.OpCode.delete: case ZooDefs.OpCode.setData: case ZooDefs.OpCode.check: @@ -89,8 +90,9 @@ public class MultiTransactionRecord impl while (!h.getDone()) { switch (h.getType()) { - case ZooDefs.OpCode.create: - case ZooDefs.OpCode.create2: + case ZooDefs.OpCode.create: + case ZooDefs.OpCode.create2: + case ZooDefs.OpCode.createContainer: CreateRequest cr = new CreateRequest(); cr.deserialize(archive, tag); add(Op.create(cr.getPath(), cr.getData(), cr.getAcl(), cr.getFlags())); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java Thu Jun 4 17:51:16 2015 @@ -183,14 +183,18 @@ public abstract class Op { private int flags; private Create(String path, byte[] data, List acl, int flags) { - super(ZooDefs.OpCode.create, path); + super(getOpcode(CreateMode.fromFlag(flags, CreateMode.PERSISTENT)), path); this.data = data; this.acl = acl; this.flags = flags; } + private static int getOpcode(CreateMode createMode) { + return createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create; + } + private Create(String path, byte[] data, List acl, CreateMode createMode) { - super(ZooDefs.OpCode.create, path); + super(getOpcode(createMode), path); this.data = data; this.acl = acl; this.flags = createMode.toFlag(); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Thu Jun 4 17:51:16 2015 @@ -65,6 +65,10 @@ public class ZooDefs { public final int removeWatches = 18; + public final int createContainer = 19; + + public final int deleteContainer = 20; + public final int auth = 100; public final int setWatches = 101; Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu Jun 4 17:51:16 2015 @@ -1194,7 +1194,7 @@ public class ZooKeeper { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.create); + h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); @@ -1282,7 +1282,7 @@ public class ZooKeeper { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.create2); + h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create2); CreateRequest request = new CreateRequest(); Create2Response response = new Create2Response(); request.setData(data); @@ -1321,7 +1321,7 @@ public class ZooKeeper { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.create); + h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); @@ -1347,7 +1347,7 @@ public class ZooKeeper { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); - h.setType(ZooDefs.OpCode.create2); + h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create2); CreateRequest request = new CreateRequest(); Create2Response response = new Create2Response(); ReplyHeader r = new ReplyHeader(); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java Thu Jun 4 17:51:16 2015 @@ -36,10 +36,11 @@ public class CreateCommand extends CliCo { options.addOption(new Option("e", false, "ephemeral")); options.addOption(new Option("s", false, "sequential")); + options.addOption(new Option("c", false, "container")); } public CreateCommand() { - super("create", "[-s] [-e] path [data] [acl]"); + super("create", "[-s] [-e] [-c] path [data] [acl]"); } @@ -58,12 +59,22 @@ public class CreateCommand extends CliCo @Override public boolean exec() throws KeeperException, InterruptedException { CreateMode flags = CreateMode.PERSISTENT; - if(cl.hasOption("e") && cl.hasOption("s")) { + boolean hasE = cl.hasOption("e"); + boolean hasS = cl.hasOption("s"); + boolean hasC = cl.hasOption("c"); + if (hasC && (hasE || hasS)) { + err.println("-c cannot be combined with -s or -e. Containers cannot be ephemeral or sequential."); + return false; + } + + if(hasE && hasS) { flags = CreateMode.EPHEMERAL_SEQUENTIAL; - } else if (cl.hasOption("e")) { + } else if (hasE) { flags = CreateMode.EPHEMERAL; - } else if (cl.hasOption("s")) { + } else if (hasS) { flags = CreateMode.PERSISTENT_SEQUENTIAL; + } else if (hasC) { + flags = CreateMode.CONTAINER; } String path = args[1]; byte[] data = null; Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ContainerManager.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ContainerManager.java?rev=1683605&view=auto ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ContainerManager.java (added) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ContainerManager.java Thu Jun 4 17:51:16 2015 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.common.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manages cleanup of container ZNodes. This class is meant to only + * be run from the leader. There's no harm in running from followers/observers + * but that will be extra work that's not needed. Once started, it periodically + * checks container nodes that have a cversion > 0 and have no children. A + * delete is attempted on the node. The result of the delete is unimportant. + * If the proposal fails or the container node is not empty there's no harm. + */ +public class ContainerManager { + private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class); + private final ZKDatabase zkDb; + private final RequestProcessor requestProcessor; + private final int checkIntervalMs; + private final int maxPerMinute; + private final Timer timer; + private final AtomicReference task = new AtomicReference(null); + + /** + * @param zkDb the ZK database + * @param requestProcessor request processer - used to inject delete + * container requests + * @param checkIntervalMs how often to check containers in milliseconds + * @param maxPerMinute the max containers to delete per second - avoids + * herding of container deletions + */ + public ContainerManager(ZKDatabase zkDb, RequestProcessor requestProcessor, + int checkIntervalMs, int maxPerMinute) { + this.zkDb = zkDb; + this.requestProcessor = requestProcessor; + this.checkIntervalMs = checkIntervalMs; + this.maxPerMinute = maxPerMinute; + timer = new Timer("ContainerManagerTask", true); + + LOG.info(String.format("Using checkIntervalMs=%d maxPerMinute=%d", + checkIntervalMs, maxPerMinute)); + } + + /** + * start/restart the timer the runs the check. Can safely be called + * multiple times. + */ + public void start() { + if (task.get() == null) { + TimerTask timerTask = new TimerTask() { + @Override + public void run() { + try { + checkContainers(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("interrupted"); + cancel(); + } catch ( Throwable e ) { + LOG.error("Error checking containers", e); + } + } + }; + if (task.compareAndSet(null, timerTask)) { + timer.scheduleAtFixedRate(timerTask, checkIntervalMs, + checkIntervalMs); + } + } + } + + /** + * stop the timer if necessary. Can safely be called multiple times. + */ + public void stop() { + TimerTask timerTask = task.getAndSet(null); + if (timerTask != null) { + timerTask.cancel(); + } + } + + /** + * Manually check the containers. Not normally used directly + */ + public void checkContainers() + throws InterruptedException { + long minIntervalMs = getMinIntervalMs(); + for (String containerPath : getCandidates()) { + long startMs = Time.currentElapsedTime(); + + ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes()); + Request request = new Request(null, 0, 0, + ZooDefs.OpCode.deleteContainer, path, null); + try { + LOG.info("Attempting to delete candidate container: %s", + containerPath); + requestProcessor.processRequest(request); + } catch (Exception e) { + LOG.error(String.format("Could not delete container: %s" , + containerPath), e); + } + + long elapsedMs = Time.currentElapsedTime() - startMs; + long waitMs = minIntervalMs - elapsedMs; + if (waitMs > 0) { + Thread.sleep(waitMs); + } + } + } + + // VisibleForTesting + protected long getMinIntervalMs() { + return TimeUnit.MINUTES.toMillis(1) / maxPerMinute; + } + + // VisibleForTesting + protected Collection getCandidates() { + Set candidates = new HashSet(); + for (String containerPath : zkDb.getDataTree().getContainers()) { + DataNode node = zkDb.getDataTree().getNode(containerPath); + /* + cversion > 0: keep newly created containers from being deleted + before any children have been added. If you were to create the + container just before a container cleaning period the container + would be immediately be deleted. + */ + if ((node != null) && (node.stat.getCversion() > 0) && + (node.getChildren().size() == 0)) { + candidates.add(containerPath); + } + } + return candidates; + } +} Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java Thu Jun 4 17:51:16 2015 @@ -145,7 +145,7 @@ public class DataNode implements Record to.setMzxid(stat.getMzxid()); to.setPzxid(stat.getPzxid()); to.setVersion(stat.getVersion()); - to.setEphemeralOwner(stat.getEphemeralOwner()); + to.setEphemeralOwner(getClientEphemeralOwner(stat)); to.setDataLength(data == null ? 0 : data.length); int numChildren = 0; if (this.children != null) { @@ -158,6 +158,11 @@ public class DataNode implements Record to.setNumChildren(numChildren); } + private static long getClientEphemeralOwner(StatPersisted stat) { + return (stat.getEphemeralOwner() == DataTree.CONTAINER_EPHEMERAL_OWNER) + ? 0 : stat.getEphemeralOwner(); + } + synchronized public void deserialize(InputArchive archive, String tag) throws IOException { archive.startRecord("node"); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Thu Jun 4 17:51:16 2015 @@ -18,19 +18,6 @@ package org.apache.zookeeper.server; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.jute.Index; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; @@ -43,11 +30,11 @@ import org.apache.zookeeper.Quotas; import org.apache.zookeeper.StatsTrack; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.WatcherType; -import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.Watcher.WatcherType; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.PathTrie; @@ -55,6 +42,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.StatPersisted; import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.DeleteTxn; import org.apache.zookeeper.txn.ErrorTxn; @@ -66,6 +54,20 @@ import org.apache.zookeeper.txn.TxnHeade import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** * This class maintains the tree data structure. It doesn't have any networking * or client connection code in it so that it can be tested in a stand alone @@ -78,6 +80,8 @@ import org.slf4j.LoggerFactory; public class DataTree { private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); + public static final long CONTAINER_EPHEMERAL_OWNER = Long.MIN_VALUE; + /** * This hashtable provides a fast lookup to the datanodes. The tree is the * source of truth and is where all the locking occurs @@ -130,6 +134,12 @@ public class DataTree { new ConcurrentHashMap>(); /** + * This set contains the paths of all container nodes + */ + private final Set containers = + Collections.newSetFromMap(new ConcurrentHashMap()); + + /** * this is map from longs to acl's. It saves acl's being stored for each * datanode. */ @@ -160,6 +170,10 @@ public class DataTree { return cloned; } + public Set getContainers() { + return new HashSet(containers); + } + int getAclSize() { return longKeyMap.size(); } @@ -507,7 +521,9 @@ public class DataTree { DataNode child = new DataNode(data, longval, stat); parent.addChild(childName); nodes.put(path, child); - if (ephemeralOwner != 0) { + if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) { + containers.add(path); + } else if (ephemeralOwner != 0) { HashSet list = ephemerals.get(ephemeralOwner); if (list == null) { list = new HashSet(); @@ -573,7 +589,9 @@ public class DataTree { parent.removeChild(childName); parent.stat.setPzxid(zxid); long eowner = node.stat.getEphemeralOwner(); - if (eowner != 0) { + if (eowner == CONTAINER_EPHEMERAL_OWNER) { + containers.remove(path); + } else if (eowner != 0) { HashSet nodes = ephemerals.get(eowner); if (nodes != null) { synchronized (nodes) { @@ -824,7 +842,21 @@ public class DataTree { header.getZxid(), header.getTime(), stat); rc.stat = stat; break; + case OpCode.createContainer: + CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn; + rc.path = createContainerTxn.getPath(); + stat = new Stat(); + createNode( + createContainerTxn.getPath(), + createContainerTxn.getData(), + createContainerTxn.getAcl(), + CONTAINER_EPHEMERAL_OWNER, + createContainerTxn.getParentCVersion(), + header.getZxid(), header.getTime(), stat); + rc.stat = stat; + break; case OpCode.delete: + case OpCode.deleteContainer: DeleteTxn deleteTxn = (DeleteTxn) txn; rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); @@ -874,7 +906,11 @@ public class DataTree { case OpCode.create: record = new CreateTxn(); break; + case OpCode.createContainer: + record = new CreateContainerTxn(); + break; case OpCode.delete: + case OpCode.deleteContainer: record = new DeleteTxn(); break; case OpCode.setData: @@ -1228,7 +1264,9 @@ public class DataTree { } parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); - if (eowner != 0) { + if (eowner == CONTAINER_EPHEMERAL_OWNER) { + containers.add(path); + } else if (eowner != 0) { HashSet list = ephemerals.get(eowner); if (list == null) { list = new HashSet(); Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Jun 4 17:51:16 2015 @@ -18,22 +18,21 @@ package org.apache.zookeeper.server; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Locale; - import org.apache.jute.Record; -import org.apache.zookeeper.common.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.KeeperException.SessionMovedException; import org.apache.zookeeper.MultiResponse; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.OpResult.CheckResult; +import org.apache.zookeeper.OpResult.CreateResult; +import org.apache.zookeeper.OpResult.DeleteResult; +import org.apache.zookeeper.OpResult.ErrorResult; +import org.apache.zookeeper.OpResult.SetDataResult; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.KeeperException.SessionMovedException; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CheckWatchesRequest; @@ -61,13 +60,13 @@ import org.apache.zookeeper.server.ZooKe import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; import org.apache.zookeeper.txn.ErrorTxn; import org.apache.zookeeper.txn.TxnHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.zookeeper.OpResult; -import org.apache.zookeeper.OpResult.CheckResult; -import org.apache.zookeeper.OpResult.CreateResult; -import org.apache.zookeeper.OpResult.DeleteResult; -import org.apache.zookeeper.OpResult.SetDataResult; -import org.apache.zookeeper.OpResult.ErrorResult; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Locale; /** * This Request processor actually applies any transaction associated with a @@ -216,9 +215,11 @@ public class FinalRequestProcessor imple subResult = new CreateResult(subTxnResult.path); break; case OpCode.create2: + case OpCode.createContainer: subResult = new CreateResult(subTxnResult.path, subTxnResult.stat); break; case OpCode.delete: + case OpCode.deleteContainer: subResult = new DeleteResult(); break; case OpCode.setData: @@ -242,13 +243,15 @@ public class FinalRequestProcessor imple err = Code.get(rc.err); break; } - case OpCode.create2: { + case OpCode.create2: + case OpCode.createContainer: { lastOp = "CREA"; rsp = new Create2Response(rc.path, rc.stat); err = Code.get(rc.err); break; } - case OpCode.delete: { + case OpCode.delete: + case OpCode.deleteContainer: { lastOp = "DELE"; err = Code.get(rc.err); break; Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Thu Jun 4 17:51:16 2015 @@ -18,67 +18,67 @@ package org.apache.zookeeper.server; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.StringReader; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Properties; -import java.util.Locale; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.jute.Record; import org.apache.jute.BinaryOutputArchive; - -import org.apache.zookeeper.common.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.BadArgumentsException; +import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MultiTransactionRecord; import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.common.StringUtils; +import org.apache.zookeeper.common.Time; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.StatPersisted; +import org.apache.zookeeper.proto.CheckVersionRequest; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.ReconfigRequest; import org.apache.zookeeper.proto.SetACLRequest; import org.apache.zookeeper.proto.SetDataRequest; -import org.apache.zookeeper.proto.CheckVersionRequest; import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord; import org.apache.zookeeper.server.auth.AuthenticationProvider; import org.apache.zookeeper.server.auth.ProviderRegistry; +import org.apache.zookeeper.server.quorum.Leader.XidRolloverException; import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import org.apache.zookeeper.server.quorum.Leader.XidRolloverException; +import org.apache.zookeeper.txn.CheckVersionTxn; +import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.DeleteTxn; import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.MultiTxn; import org.apache.zookeeper.txn.SetACLTxn; import org.apache.zookeeper.txn.SetDataTxn; -import org.apache.zookeeper.txn.CheckVersionTxn; import org.apache.zookeeper.txn.Txn; -import org.apache.zookeeper.txn.MultiTxn; import org.apache.zookeeper.txn.TxnHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.StringReader; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Locale; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; /** * This request processor is generally at the start of a RequestProcessor @@ -340,8 +340,8 @@ public class PrepRequestProcessor extend throws BadArgumentsException { int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) { - LOG.info("Invalid path " + path + " with session 0x" + - Long.toHexString(sessionId)); + LOG.info("Invalid path %s with session 0x%s", + path, Long.toHexString(sessionId)); throw new KeeperException.BadArgumentsException(path); } return path.substring(0, lastSlash); @@ -365,7 +365,8 @@ public class PrepRequestProcessor extend switch (type) { case OpCode.create: - case OpCode.create2: { + case OpCode.create2: + case OpCode.createContainer: { CreateRequest createRequest = (CreateRequest)record; if (deserialize) { ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); @@ -397,13 +398,18 @@ public class PrepRequestProcessor extend } catch (KeeperException.NoNodeException e) { // ignore this one } - boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; + boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) && + (parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER); if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; - request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), - newCversion)); + if (type == OpCode.createContainer) { + request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion)); + } else { + request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), + newCversion)); + } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); @@ -415,18 +421,31 @@ public class PrepRequestProcessor extend addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); break; } + case OpCode.deleteContainer: { + String path = new String(request.request.array()); + String parentPath = getParentPathAndValidate(path); + ChangeRecord parentRecord = getRecordForPath(parentPath); + ChangeRecord nodeRecord = getRecordForPath(path); + if (nodeRecord.childCount > 0) { + throw new KeeperException.NotEmptyException(path); + } + if (nodeRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER) { + throw new KeeperException.BadVersionException(path); + } + request.setTxn(new DeleteTxn(path)); + parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); + parentRecord.childCount--; + addChangeRecord(parentRecord); + addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null)); + break; + } case OpCode.delete: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); DeleteRequest deleteRequest = (DeleteRequest)record; if(deserialize) ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest); String path = deleteRequest.getPath(); - int lastSlash = path.lastIndexOf('/'); - if (lastSlash == -1 || path.indexOf('\0') != -1 - || zks.getZKDatabase().isSpecialPath(path)) { - throw new KeeperException.BadArgumentsException(path); - } - String parentPath = path.substring(0, lastSlash); + String parentPath = getParentPathAndValidate(path); ChangeRecord parentRecord = getRecordForPath(parentPath); ChangeRecord nodeRecord = getRecordForPath(path); checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo); @@ -642,6 +661,16 @@ public class PrepRequestProcessor extend } } + private String getParentPathAndValidate(String path) + throws BadArgumentsException { + int lastSlash = path.lastIndexOf('/'); + if (lastSlash == -1 || path.indexOf('\0') != -1 + || zks.getZKDatabase().isSpecialPath(path)) { + throw new BadArgumentsException(path); + } + return path.substring(0, lastSlash); + } + private static int checkAndIncVersion(int currentVersion, int expectedVersion, String path) throws KeeperException.BadVersionException { if (expectedVersion != -1 && expectedVersion != currentVersion) { @@ -664,13 +693,15 @@ public class PrepRequestProcessor extend try { switch (request.type) { + case OpCode.createContainer: case OpCode.create: case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; + case OpCode.deleteContainer: case OpCode.delete: - DeleteRequest deleteRequest = new DeleteRequest(); + DeleteRequest deleteRequest = new DeleteRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Thu Jun 4 17:51:16 2015 @@ -137,8 +137,10 @@ public class Request { case OpCode.closeSession: case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.createSession: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.exists: case OpCode.getACL: case OpCode.getChildren: @@ -169,8 +171,10 @@ public class Request { return false; case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.error: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.setACL: case OpCode.setData: case OpCode.check: @@ -193,10 +197,14 @@ public class Request { return "create"; case OpCode.create2: return "create2"; + case OpCode.createContainer: + return "createContainer"; case OpCode.setWatches: return "setWatches"; case OpCode.delete: return "delete"; + case OpCode.deleteContainer: + return "deleteContainer"; case OpCode.exists: return "exists"; case OpCode.getData: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Thu Jun 4 17:51:16 2015 @@ -37,8 +37,12 @@ public class TraceFormatter { return "create"; case OpCode.create2: return "create2"; + case OpCode.createContainer: + return "createContainer"; case OpCode.delete: return "delete"; + case OpCode.deleteContainer: + return "deleteContainer"; case OpCode.exists: return "exists"; case OpCode.getData: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Thu Jun 4 17:51:16 2015 @@ -19,6 +19,7 @@ package org.apache.zookeeper.server; import java.io.IOException; +import java.util.concurrent.TimeUnit; import javax.management.JMException; @@ -45,6 +46,7 @@ public class ZooKeeperServerMain { // ZooKeeper server supports two kinds of connection: unencrypted and encrypted. private ServerCnxnFactory cnxnFactory; private ServerCnxnFactory secureCnxnFactory; + private ContainerManager containerManager; private AdminServer adminServer; @@ -138,6 +140,12 @@ public class ZooKeeperServerMain { secureCnxnFactory.startup(zkServer, needStartZKServer); } + containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor, + Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), + Integer.getInteger("znode.container.maxPerMinute", 10000) + ); + containerManager.start(); + if (cnxnFactory != null) { cnxnFactory.join(); } @@ -162,6 +170,9 @@ public class ZooKeeperServerMain { * Shutdown the serving instance */ protected void shutdown() { + if (containerManager != null) { + containerManager.stop(); + } if (cnxnFactory != null) { cnxnFactory.shutdown(); } Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Thu Jun 4 17:51:16 2015 @@ -133,7 +133,9 @@ public class CommitProcessor extends Zoo switch (request.type) { case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.multi: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Thu Jun 4 17:51:16 2015 @@ -84,7 +84,9 @@ public class FollowerRequestProcessor ex break; case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Thu Jun 4 17:51:16 2015 @@ -18,10 +18,9 @@ package org.apache.zookeeper.server.quorum; -import java.io.IOException; - import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.jmx.MBeanRegistry; +import org.apache.zookeeper.server.ContainerManager; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.PrepRequestProcessor; @@ -31,6 +30,9 @@ import org.apache.zookeeper.server.Serve import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + /** * * Just like the standard ZooKeeperServer. We just replace the request @@ -39,6 +41,8 @@ import org.apache.zookeeper.server.persi * FinalRequestProcessor */ public class LeaderZooKeeperServer extends QuorumZooKeeperServer { + private ContainerManager containerManager; // guarded by sync + CommitProcessor commitProcessor; @@ -71,6 +75,31 @@ public class LeaderZooKeeperServer exten prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor); prepRequestProcessor.start(); firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor); + + setupContainerManager(); + } + + private synchronized void setupContainerManager() { + containerManager = new ContainerManager(getZKDatabase(), prepRequestProcessor, + Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), + Integer.getInteger("znode.container.maxPerMinute", 10000) + ); + } + + @Override + public synchronized void startup() { + super.startup(); + if (containerManager != null) { + containerManager.start(); + } + } + + @Override + public synchronized void shutdown() { + if (containerManager != null) { + containerManager.stop(); + } + super.shutdown(); } @Override Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Thu Jun 4 17:51:16 2015 @@ -93,7 +93,9 @@ public class ObserverRequestProcessor ex break; case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java Thu Jun 4 17:51:16 2015 @@ -82,7 +82,9 @@ public class ReadOnlyRequestProcessor ex case OpCode.sync: case OpCode.create: case OpCode.create2: + case OpCode.createContainer: case OpCode.delete: + case OpCode.deleteContainer: case OpCode.setData: case OpCode.reconfig: case OpCode.setACL: Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java Thu Jun 4 17:51:16 2015 @@ -18,16 +18,6 @@ package org.apache.zookeeper.server.util; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.jute.BinaryInputArchive; import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; @@ -35,15 +25,25 @@ import org.apache.jute.Record; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.CreateContainerTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.CreateTxnV0; import org.apache.zookeeper.txn.DeleteTxn; import org.apache.zookeeper.txn.ErrorTxn; +import org.apache.zookeeper.txn.MultiTxn; import org.apache.zookeeper.txn.SetACLTxn; import org.apache.zookeeper.txn.SetDataTxn; import org.apache.zookeeper.txn.TxnHeader; -import org.apache.zookeeper.txn.MultiTxn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; public class SerializeUtils { private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class); @@ -68,7 +68,11 @@ public class SerializeUtils { case OpCode.create2: txn = new CreateTxn(); break; + case OpCode.createContainer: + txn = new CreateContainerTxn(); + break; case OpCode.delete: + case OpCode.deleteContainer: txn = new DeleteTxn(); break; case OpCode.reconfig: Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CreateContainerTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CreateContainerTest.java?rev=1683605&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CreateContainerTest.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CreateContainerTest.java Thu Jun 4 17:51:16 2015 @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.*; + +public class CreateContainerTest extends ClientBase { + private ZooKeeper zk; + + @Override + public void setUp() throws Exception { + super.setUp(); + zk = createClient(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + zk.close(); + } + + @Test(timeout = 30000) + public void testCreate() + throws IOException, KeeperException, InterruptedException { + createNoStatVerifyResult("/foo"); + createNoStatVerifyResult("/foo/child"); + } + + @Test(timeout = 30000) + public void testCreateWithStat() + throws IOException, KeeperException, InterruptedException { + Stat stat = createWithStatVerifyResult("/foo"); + Stat childStat = createWithStatVerifyResult("/foo/child"); + // Don't expect to get the same stats for different creates. + Assert.assertFalse(stat.equals(childStat)); + } + + @SuppressWarnings("ConstantConditions") + @Test(timeout = 30000) + public void testCreateWithNullStat() + throws IOException, KeeperException, InterruptedException { + final String name = "/foo"; + Assert.assertNull(zk.exists(name, false)); + + Stat stat = null; + // If a null Stat object is passed the create should still + // succeed, but no Stat info will be returned. + zk.create(name, name.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER, stat); + Assert.assertNull(stat); + Assert.assertNotNull(zk.exists(name, false)); + } + + @Test(timeout = 30000) + public void testSimpleDeletion() + throws IOException, KeeperException, InterruptedException { + zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + zk.create("/foo/bar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.delete("/foo/bar", -1); // should cause "/foo" to get deleted when checkContainers() is called + + ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), serverFactory.getZooKeeperServer().firstProcessor, 1, 100); + containerManager.checkContainers(); + + Thread.sleep(1000); + + Assert.assertNull("Container should have been deleted", zk.exists("/foo", false)); + } + + @Test(timeout = 30000) + public void testMultiWithContainerSimple() + throws IOException, KeeperException, InterruptedException { + Op createContainer = Op.create("/foo", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + zk.multi(Collections.singletonList(createContainer)); + + DataTree dataTree = serverFactory.getZooKeeperServer().getZKDatabase().getDataTree(); + Assert.assertEquals(dataTree.getContainers().size(), 1); + } + + @Test(timeout = 30000) + public void testMultiWithContainer() + throws IOException, KeeperException, InterruptedException { + Op createContainer = Op.create("/foo", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + Op createChild = Op.create("/foo/bar", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.multi(Arrays.asList(createContainer, createChild)); + + DataTree dataTree = serverFactory.getZooKeeperServer().getZKDatabase().getDataTree(); + Assert.assertEquals(dataTree.getContainers().size(), 1); + + zk.delete("/foo/bar", -1); // should cause "/foo" to get deleted when checkContainers() is called + + ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), serverFactory.getZooKeeperServer().firstProcessor, 1, 100); + containerManager.checkContainers(); + + Thread.sleep(1000); + + Assert.assertNull("Container should have been deleted", zk.exists("/foo", false)); + + createContainer = Op.create("/foo", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + createChild = Op.create("/foo/bar", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Op deleteChild = Op.delete("/foo/bar", -1); + zk.multi(Arrays.asList(createContainer, createChild, deleteChild)); + + containerManager.checkContainers(); + + Thread.sleep(1000); + + Assert.assertNull("Container should have been deleted", zk.exists("/foo", false)); + } + + @Test(timeout = 30000) + public void testSimpleDeletionAsync() + throws IOException, KeeperException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + AsyncCallback.Create2Callback cb = new AsyncCallback.Create2Callback() { + @Override + public void processResult(int rc, String path, Object ctx, String name, Stat stat) { + Assert.assertEquals(ctx, "context"); + latch.countDown(); + } + }; + zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER, cb, "context"); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + zk.create("/foo/bar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.delete("/foo/bar", -1); // should cause "/foo" to get deleted when checkContainers() is called + + ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), serverFactory.getZooKeeperServer().firstProcessor, 1, 100); + containerManager.checkContainers(); + + Thread.sleep(1000); + + Assert.assertNull("Container should have been deleted", zk.exists("/foo", false)); + } + + @Test(timeout = 30000) + public void testCascadingDeletion() + throws IOException, KeeperException, InterruptedException { + zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + zk.create("/foo/bar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + zk.create("/foo/bar/one", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.delete("/foo/bar/one", -1); // should cause "/foo/bar" and "/foo" to get deleted when checkContainers() is called + + ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), serverFactory.getZooKeeperServer().firstProcessor, 1, 100); + containerManager.checkContainers(); + Thread.sleep(1000); + containerManager + .checkContainers(); + Thread.sleep(1000); + + Assert.assertNull("Container should have been deleted", zk.exists("/foo/bar", false)); + Assert.assertNull("Container should have been deleted", zk.exists("/foo", false)); + } + + @Test(timeout = 30000) + public void testFalseEmpty() + throws IOException, KeeperException, InterruptedException { + zk.create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + zk.create("/foo/bar", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), serverFactory.getZooKeeperServer().firstProcessor, 1, 100) { + @Override + protected Collection getCandidates() { + return Collections.singletonList("/foo"); + } + }; + containerManager.checkContainers(); + Thread.sleep(1000); + + Assert.assertNotNull("Container should have not been deleted", zk.exists("/foo", false)); + } + + @Test(timeout = 30000) + public void testMaxPerMinute() + throws IOException, KeeperException, InterruptedException { + final BlockingQueue queue = new LinkedBlockingQueue(); + RequestProcessor processor = new RequestProcessor() { + @Override + public void processRequest(Request request) throws RequestProcessorException { + queue.add(new String(request.request.array())); + } + + @Override + public void shutdown() { + } + }; + final ContainerManager containerManager = new ContainerManager(serverFactory.getZooKeeperServer() + .getZKDatabase(), processor, 1, 2) { + @Override + protected long getMinIntervalMs() { + return 1000; + } + + @Override + protected Collection getCandidates() { + return Arrays.asList("/one", "/two", "/three", "/four"); + } + }; + Executors.newSingleThreadExecutor().submit(new Callable() { + @Override + public Void call() throws Exception { + containerManager.checkContainers(); + return null; + } + }); + Assert.assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one"); + Assert.assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two"); + Assert.assertEquals(queue.size(), 0); + Thread.sleep(500); + Assert.assertEquals(queue.size(), 0); + + Assert.assertEquals(queue.poll(5, TimeUnit.SECONDS), "/three"); + Assert.assertEquals(queue.poll(5, TimeUnit.SECONDS), "/four"); + } + + private void createNoStatVerifyResult(String newName) + throws KeeperException, InterruptedException { + Assert.assertNull("Node existed before created", zk.exists(newName, false)); + zk.create(newName, newName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER); + Assert.assertNotNull("Node was not created as expected", + zk.exists(newName, false)); + } + + private Stat createWithStatVerifyResult(String newName) + throws KeeperException, InterruptedException { + Assert.assertNull("Node existed before created", zk.exists(newName, false)); + Stat stat = new Stat(); + zk.create(newName, newName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER, stat); + validateCreateStat(stat, newName); + + Stat referenceStat = zk.exists(newName, false); + Assert.assertNotNull("Node was not created as expected", referenceStat); + Assert.assertEquals(referenceStat, stat); + + return stat; + } + + private void validateCreateStat(Stat stat, String name) { + Assert.assertEquals(stat.getCzxid(), stat.getMzxid()); + Assert.assertEquals(stat.getCzxid(), stat.getPzxid()); + Assert.assertEquals(stat.getCtime(), stat.getMtime()); + Assert.assertEquals(0, stat.getCversion()); + Assert.assertEquals(0, stat.getVersion()); + Assert.assertEquals(0, stat.getAversion()); + Assert.assertEquals(0, stat.getEphemeralOwner()); + Assert.assertEquals(name.length(), stat.getDataLength()); + Assert.assertEquals(0, stat.getNumChildren()); + } +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CreateModeTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CreateModeTest.java?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CreateModeTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CreateModeTest.java Thu Jun 4 17:51:16 2015 @@ -35,21 +35,31 @@ public class CreateModeTest extends ZKTe Assert.assertEquals(cm.toFlag(), 0); Assert.assertFalse(cm.isEphemeral()); Assert.assertFalse(cm.isSequential()); - + Assert.assertFalse(cm.isContainer()); + cm = CreateMode.EPHEMERAL; Assert.assertEquals(cm.toFlag(), 1); Assert.assertTrue(cm.isEphemeral()); Assert.assertFalse(cm.isSequential()); - + Assert.assertFalse(cm.isContainer()); + cm = CreateMode.PERSISTENT_SEQUENTIAL; Assert.assertEquals(cm.toFlag(), 2); Assert.assertFalse(cm.isEphemeral()); Assert.assertTrue(cm.isSequential()); - + Assert.assertFalse(cm.isContainer()); + cm = CreateMode.EPHEMERAL_SEQUENTIAL; Assert.assertEquals(cm.toFlag(), 3); Assert.assertTrue(cm.isEphemeral()); Assert.assertTrue(cm.isSequential()); + Assert.assertFalse(cm.isContainer()); + + cm = CreateMode.CONTAINER; + Assert.assertEquals(cm.toFlag(), 4); + Assert.assertFalse(cm.isEphemeral()); + Assert.assertFalse(cm.isSequential()); + Assert.assertTrue(cm.isContainer()); } @Test Modified: zookeeper/trunk/src/zookeeper.jute URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/zookeeper.jute?rev=1683605&r1=1683604&r2=1683605&view=diff ============================================================================== --- zookeeper/trunk/src/zookeeper.jute (original) +++ zookeeper/trunk/src/zookeeper.jute Thu Jun 4 17:51:16 2015 @@ -260,6 +260,12 @@ module org.apache.zookeeper.txn { boolean ephemeral; int parentCVersion; } + class CreateContainerTxn { + ustring path; + buffer data; + vector acl; + int parentCVersion; + } class DeleteTxn { ustring path; }