Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A96E710677 for ; Fri, 11 Apr 2014 17:27:41 +0000 (UTC) Received: (qmail 74211 invoked by uid 500); 11 Apr 2014 17:27:39 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 74136 invoked by uid 500); 11 Apr 2014 17:27:38 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 73811 invoked by uid 99); 11 Apr 2014 17:27:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Apr 2014 17:27:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2A39A98AFB4; Fri, 11 Apr 2014 17:27:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bhavanki@apache.org To: commits@accumulo.apache.org Date: Fri, 11 Apr 2014 17:27:35 -0000 Message-Id: <57c046625d40432fa9fb74f5c7788661@git.apache.org> In-Reply-To: <01223fc939144fb5aff8268f7e9619b1@git.apache.org> References: <01223fc939144fb5aff8268f7e9619b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/10] git commit: Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT Merge branch '1.4.6-SNAPSHOT' into 1.5.2-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7a14fcc9 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7a14fcc9 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7a14fcc9 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 7a14fcc923685f6997430e2b665a20dba94be171 Parents: 0607c8c 7461ed9 Author: Bill Havanki Authored: Fri Apr 11 13:16:12 2014 -0400 Committer: Bill Havanki Committed: Fri Apr 11 13:16:12 2014 -0400 ---------------------------------------------------------------------- .../java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/7a14fcc9/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java ---------------------------------------------------------------------- diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java index 7800ec0,0000000..64cb1e1 mode 100644,000000..100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java @@@ -1,207 -1,0 +1,207 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.fate.zookeeper; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.security.SecurityPermission; +import java.util.List; + +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +public class ZooReaderWriter extends ZooReader implements IZooReaderWriter { + + private static SecurityPermission ZOOWRITER_PERMISSION = new SecurityPermission("zookeeperWriterPermission"); + + private static ZooReaderWriter instance = null; + private static IZooReaderWriter retryingInstance = null; + private final String scheme; + private final byte[] auth; + + @Override + public ZooKeeper getZooKeeper() { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(ZOOWRITER_PERMISSION); + } + return getSession(keepers, timeout, scheme, auth); + } + + public ZooReaderWriter(String string, int timeInMillis, String scheme, byte[] auth) { + super(string, timeInMillis); + this.scheme = scheme; + this.auth = auth; + } + + @Override + public void recursiveDelete(String zPath, NodeMissingPolicy policy) throws KeeperException, InterruptedException { + ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy); + } + + @Override + public void recursiveDelete(String zPath, int version, NodeMissingPolicy policy) throws KeeperException, InterruptedException { + ZooUtil.recursiveDelete(getZooKeeper(), zPath, version, policy); + } + + /** + * Create a persistent node with the default ACL + * + * @return true if the node was created or altered; false if it was skipped + */ + @Override + public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException { + return ZooUtil.putPersistentData(getZooKeeper(), zPath, data, policy); + } + + @Override + public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) throws KeeperException, InterruptedException { + return ZooUtil.putPrivatePersistentData(getZooKeeper(), zPath, data, policy); + } + + @Override + public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) throws KeeperException, InterruptedException { + ZooUtil.putPersistentData(getZooKeeper(), zPath, data, version, policy); + } + + @Override + public String putPersistentSequential(String zPath, byte[] data) throws KeeperException, InterruptedException { + return ZooUtil.putPersistentSequential(getZooKeeper(), zPath, data); + } + + @Override + public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException { + return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data); + } + + @Override + public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException { + return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data); + } + + @Override + public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException { + ZooUtil.recursiveCopyPersistent(getZooKeeper(), source, destination, policy); + } + + @Override + public void delete(String path, int version) throws InterruptedException, KeeperException { + getZooKeeper().delete(path, version); + } + + public interface Mutator { + byte[] mutate(byte[] currentValue) throws Exception; + } + + @Override + public byte[] mutate(String zPath, byte[] createValue, List acl, Mutator mutator) throws Exception { + if (createValue != null) { + try { + getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT); + return createValue; + } catch (NodeExistsException ex) { + // expected + } + } + do { + Stat stat = new Stat(); + byte[] data = getZooKeeper().getData(zPath, false, stat); + data = mutator.mutate(data); + if (data == null) + return data; + try { + getZooKeeper().setData(zPath, data, stat.getVersion()); + return data; + } catch (BadVersionException ex) { + // + } + } while (true); + } + + public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) { + if (instance == null) + instance = new ZooReaderWriter(zookeepers, timeInMillis, scheme, auth); + return instance; + } + + /** + * get an instance that retries when zookeeper connection errors occur + * + * @return an instance that retries when Zookeeper connection errors occur. + */ + public static synchronized IZooReaderWriter getRetryingInstance(String zookeepers, int timeInMillis, String scheme, byte[] auth) { + + if (retryingInstance == null) { + final IZooReaderWriter inst = getInstance(zookeepers, timeInMillis, scheme, auth); + + InvocationHandler ih = new InvocationHandler() { + @Override + public Object invoke(Object obj, Method method, Object[] args) throws Throwable { + long retryTime = 250; + while (true) { + try { + return method.invoke(inst, args); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof KeeperException.ConnectionLossException) { - Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + retryTime, e.getCause()); ++ Logger.getLogger(ZooReaderWriter.class).warn("Error connecting to zookeeper, will retry in " + String.format("%.2f secs", retryTime / 1000.0), e.getCause()); + UtilWaitThread.sleep(retryTime); + retryTime = Math.min(5000, retryTime + 250); + } else { + throw e.getCause(); + } + } + } + } + }; + + retryingInstance = (IZooReaderWriter) Proxy.newProxyInstance(ZooReaderWriter.class.getClassLoader(), new Class[] {IZooReaderWriter.class}, ih); + } + + return retryingInstance; + } + + @Override + public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException { + return ZooUtil.isLockHeld(getZooKeeper(), lockID); + } + + @Override + public void mkdirs(String path) throws KeeperException, InterruptedException { + if (path.equals("")) + return; + if (!path.startsWith("/")) + throw new IllegalArgumentException(path + "does not start with /"); + if (getZooKeeper().exists(path, false) != null) + return; + String parent = path.substring(0, path.lastIndexOf("/")); + mkdirs(parent); + putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP); + } + + +}