Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD43C10177 for ; Thu, 20 Mar 2014 22:44:33 +0000 (UTC) Received: (qmail 35350 invoked by uid 500); 20 Mar 2014 22:44:29 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 35317 invoked by uid 500); 20 Mar 2014 22:44:28 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 35182 invoked by uid 99); 20 Mar 2014 22:44:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Mar 2014 22:44:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 148F698706D; Thu, 20 Mar 2014 22:44:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 20 Mar 2014 22:44:29 -0000 Message-Id: <582fc73ded21429685457761e00c7f4c@git.apache.org> In-Reply-To: <3d4de4c748a34de4a072422bc238af68@git.apache.org> References: <3d4de4c748a34de4a072422bc238af68@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/6] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef5dc4a1 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef5dc4a1 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef5dc4a1 Branch: refs/heads/master Commit: ef5dc4a1f6d67f643f80dd821280140fd20ee947 Parents: 44b13c1 1d608a8 Author: Josh Elser Authored: Thu Mar 20 18:43:55 2014 -0400 Committer: Josh Elser Committed: Thu Mar 20 18:43:55 2014 -0400 ---------------------------------------------------------------------- .../server/conf/NamespaceConfiguration.java | 10 ++ .../server/conf/TableConfiguration.java | 39 +++-- .../test/TableConfigurationUpdateIT.java | 154 +++++++++++++++++++ 3 files changed, 193 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java index c0ac0b8,0000000..d08d45f mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java @@@ -1,172 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.conf; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.log4j.Logger; + +public class NamespaceConfiguration extends AccumuloConfiguration { + private static final Logger log = Logger.getLogger(NamespaceConfiguration.class); + + private final AccumuloConfiguration parent; + private static ZooCache propCache = null; + protected String namespaceId = null; + protected Instance inst = null; + private Set observers; + + public NamespaceConfiguration(String namespaceId, AccumuloConfiguration parent) { + inst = HdfsZooInstance.getInstance(); + this.parent = parent; + this.namespaceId = namespaceId; + this.observers = Collections.synchronizedSet(new HashSet()); + } + + @Override + public String get(Property property) { + String key = property.getKey(); + String value = get(getPropCache(), key); + + if (value == null || !property.getType().isValidFormat(value)) { + if (value != null) + log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value); + if (!(namespaceId.equals(Namespaces.ACCUMULO_NAMESPACE_ID) && isIteratorOrConstraint(property.getKey()))) { + // ignore iterators from parent if system namespace + value = parent.get(property); + } + } + return value; + } + + private String get(ZooCache zc, String key) { + String zPath = ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF + "/" + key; + byte[] v = zc.get(zPath); + String value = null; + if (v != null) + value = new String(v, Constants.UTF8); + return value; + } + + private synchronized static ZooCache getPropCache() { + Instance inst = HdfsZooInstance.getInstance(); + if (propCache == null) + propCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new NamespaceConfWatcher(inst)); + return propCache; + } + + private class SystemNamespaceFilter implements PropertyFilter { + + private PropertyFilter userFilter; + + SystemNamespaceFilter(PropertyFilter userFilter) { + this.userFilter = userFilter; + } + + @Override + public boolean accept(String key) { + if (isIteratorOrConstraint(key)) + return false; + return userFilter.accept(key); + } + + } + + @Override + public void getProperties(Map props, PropertyFilter filter) { + + PropertyFilter parentFilter = filter; + + // exclude system iterators/constraints from the system namespace + // so they don't affect the metadata or root tables. + if (getNamespaceId().equals(Namespaces.ACCUMULO_NAMESPACE_ID)) + parentFilter = new SystemNamespaceFilter(filter); + + parent.getProperties(props, parentFilter); + + ZooCache zc = getPropCache(); + + List children = zc.getChildren(ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF); + if (children != null) { + for (String child : children) { + if (child != null && filter.accept(child)) { + String value = get(zc, child); + if (value != null) + props.put(child, value); + } + } + } + } + + protected String getNamespaceId() { + return namespaceId; + } + + public void addObserver(ConfigurationObserver co) { + if (namespaceId == null) { + String err = "Attempt to add observer for non-namespace configuration"; + log.error(err); + throw new RuntimeException(err); + } + iterator(); + observers.add(co); + } + + public void removeObserver(ConfigurationObserver configObserver) { + if (namespaceId == null) { + String err = "Attempt to remove observer for non-namespace configuration"; + log.error(err); + throw new RuntimeException(err); + } + observers.remove(configObserver); + } + + public void expireAllObservers() { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.sessionExpired(); + } + + public void propertyChanged(String key) { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.propertyChanged(key); + } + + public void propertiesChanged(String key) { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.propertiesChanged(); + } + + protected boolean isIteratorOrConstraint(String key) { + return key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey()); + } ++ ++ @Override ++ public void invalidateCache() { ++ if (null != propCache) { ++ propCache.clear(); ++ } ++ // Else, if the cache is null, we could lock and double-check ++ // to see if it happened to be created so we could invalidate it ++ // but I don't see much benefit coming from that extra check. ++ } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java index fae4167,0000000..c134e31 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java @@@ -1,168 -1,0 +1,187 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.conf; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationObserver; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.log4j.Logger; + +public class TableConfiguration extends AccumuloConfiguration { + private static final Logger log = Logger.getLogger(TableConfiguration.class); - ++ + // Need volatile keyword to ensure double-checked locking works as intended + private static volatile ZooCache tablePropCache = null; ++ private static final Object initLock = new Object(); + + private final String instanceId; ++ private final Instance instance; + private final NamespaceConfiguration parent; + + private String table = null; + private Set observers; + + public TableConfiguration(String instanceId, String table, NamespaceConfiguration parent) { ++ this(instanceId, HdfsZooInstance.getInstance(), table, parent); ++ } ++ ++ public TableConfiguration(String instanceId, Instance instance, String table, NamespaceConfiguration parent) { + this.instanceId = instanceId; ++ this.instance = instance; + this.table = table; + this.parent = parent; + + this.observers = Collections.synchronizedSet(new HashSet()); + } + - private synchronized static ZooCache getTablePropCache() { - Instance inst = HdfsZooInstance.getInstance(); - if (tablePropCache == null) - tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst)); ++ private void initializeZooCache() { ++ synchronized (initLock) { ++ if (null == tablePropCache) { ++ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance)); ++ } ++ } ++ } ++ ++ private ZooCache getTablePropCache() { ++ if (null == tablePropCache) { ++ initializeZooCache(); ++ } + return tablePropCache; + } + + public void addObserver(ConfigurationObserver co) { + if (table == null) { + String err = "Attempt to add observer for non-table configuration"; + log.error(err); + throw new RuntimeException(err); + } + iterator(); + observers.add(co); + } + + public void removeObserver(ConfigurationObserver configObserver) { + if (table == null) { + String err = "Attempt to remove observer for non-table configuration"; + log.error(err); + throw new RuntimeException(err); + } + observers.remove(configObserver); + } + + public void expireAllObservers() { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.sessionExpired(); + } + + public void propertyChanged(String key) { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.propertyChanged(key); + } + + public void propertiesChanged(String key) { + Collection copy = Collections.unmodifiableCollection(observers); + for (ConfigurationObserver co : copy) + co.propertiesChanged(); + } + + @Override + public String get(Property property) { + String key = property.getKey(); + String value = get(getTablePropCache(), key); + + if (value == null || !property.getType().isValidFormat(value)) { + if (value != null) + log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value); + value = parent.get(property); + } + return value; + } + + private String get(ZooCache zc, String key) { + String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key; + byte[] v = zc.get(zPath); + String value = null; + if (v != null) + value = new String(v, Constants.UTF8); + return value; + } + + @Override + public void getProperties(Map props, PropertyFilter filter) { + parent.getProperties(props, filter); + + ZooCache zc = getTablePropCache(); + + List children = zc.getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF); + if (children != null) { + for (String child : children) { + if (child != null && filter.accept(child)) { + String value = get(zc, child); + if (value != null) + props.put(child, value); + } + } + } + } + + public String getTableId() { + return table; + } + + /** + * returns the actual NamespaceConfiguration that corresponds to the current parent namespace. + */ + public NamespaceConfiguration getNamespaceConfiguration() { + return ServerConfiguration.getNamespaceConfiguration(parent.inst, parent.namespaceId); + } + + /** + * returns the parent, which is actually a TableParentConfiguration that can change which namespace it references + */ + public NamespaceConfiguration getParentConfiguration() { + return parent; + } + + @Override + public void invalidateCache() { + if (null != tablePropCache) { - synchronized (TableConfiguration.class) { - if (null != tablePropCache) { - tablePropCache = null; - } - } ++ tablePropCache.clear(); + } ++ // Else, if the cache is null, we could lock and double-check ++ // to see if it happened to be created so we could invalidate it ++ // but I don't see much benefit coming from that extra check. ++ } ++ ++ @Override ++ public String toString() { ++ return this.getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java index 0000000,0000000..c3e3342 new file mode 100644 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java @@@ -1,0 -1,0 +1,154 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one or more ++ * contributor license agreements. See the NOTICE file distributed with ++ * this work for additional information regarding copyright ownership. ++ * The ASF licenses this file to You under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.accumulo.test; ++ ++import java.util.ArrayList; ++import java.util.Random; ++import java.util.concurrent.Callable; ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.Future; ++import java.util.concurrent.TimeUnit; ++ ++import org.apache.accumulo.core.client.Connector; ++import org.apache.accumulo.core.client.Instance; ++import org.apache.accumulo.core.client.ZooKeeperInstance; ++import org.apache.accumulo.core.client.security.tokens.PasswordToken; ++import org.apache.accumulo.core.conf.AccumuloConfiguration; ++import org.apache.accumulo.core.conf.Property; ++import org.apache.accumulo.minicluster.MiniAccumuloCluster; ++import org.apache.accumulo.server.conf.NamespaceConfiguration; ++import org.apache.accumulo.server.conf.TableConfiguration; ++import org.apache.accumulo.server.conf.TableParentConfiguration; ++import org.apache.log4j.Logger; ++import org.junit.After; ++import org.junit.Assert; ++import org.junit.Before; ++import org.junit.Test; ++import org.junit.rules.TemporaryFolder; ++ ++public class TableConfigurationUpdateIT { ++ private static final Logger log = Logger.getLogger(TableConfigurationUpdateIT.class); ++ ++ public static TemporaryFolder folder = new TemporaryFolder(); ++ private MiniAccumuloCluster accumulo; ++ private String secret = "secret"; ++ ++ @Before ++ public void setUp() throws Exception { ++ folder.create(); ++ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret); ++ accumulo.start(); ++ } ++ ++ @After ++ public void tearDown() throws Exception { ++ accumulo.stop(); ++ folder.delete(); ++ } ++ ++ @Test ++ public void test() throws Exception { ++ Instance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); ++ Connector conn = inst.getConnector("root", new PasswordToken(secret)); ++ ++ String table = "foo"; ++ conn.tableOperations().create(table); ++ ++ final NamespaceConfiguration defaultConf = new TableParentConfiguration(conn.tableOperations().tableIdMap().get(table), AccumuloConfiguration.getDefaultConfiguration()); ++ ++ // Cache invalidates 25% of the time ++ int randomMax = 4; ++ // Number of threads ++ int numThreads = 2; ++ // Number of iterations per thread ++ int iterations = 100000; ++ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf); ++ ++ long start = System.currentTimeMillis(); ++ ExecutorService svc = Executors.newFixedThreadPool(numThreads); ++ CountDownLatch countDown = new CountDownLatch(numThreads); ++ ArrayList> futures = new ArrayList>(numThreads); ++ ++ for (int i = 0; i < numThreads; i++) { ++ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown))); ++ } ++ ++ svc.shutdown(); ++ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES)); ++ ++ for (Future fut : futures) { ++ Exception e = fut.get(); ++ if (null != e) { ++ Assert.fail("Thread failed with exception " + e); ++ } ++ } ++ ++ long end = System.currentTimeMillis(); ++ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates " ++ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)"); ++ } ++ ++ public static class TableConfRunner implements Callable { ++ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD; ++ private AccumuloConfiguration tableConf; ++ private CountDownLatch countDown; ++ private int iterations, randMax; ++ ++ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) { ++ this.randMax = randMax; ++ this.iterations = iterations; ++ this.tableConf = tableConf; ++ this.countDown = countDown; ++ } ++ ++ @Override ++ public Exception call() { ++ Random r = new Random(); ++ countDown.countDown(); ++ try { ++ countDown.await(); ++ } catch (InterruptedException e) { ++ Thread.currentThread().interrupt(); ++ return e; ++ } ++ ++ String t = Thread.currentThread().getName() + " "; ++ try { ++ for (int i = 0; i < iterations; i++) { ++ // if (i % 10000 == 0) { ++ // log.info(t + " " + i); ++ // } ++ int choice = r.nextInt(randMax); ++ if (choice < 1) { ++ tableConf.invalidateCache(); ++ } else { ++ tableConf.get(prop); ++ } ++ } ++ } catch (Exception e) { ++ log.error(t, e); ++ return e; ++ } ++ ++ return null; ++ } ++ ++ } ++ ++}