Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9761A10585 for ; Tue, 4 Feb 2014 17:55:36 +0000 (UTC) Received: (qmail 26998 invoked by uid 500); 4 Feb 2014 17:55:09 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 26394 invoked by uid 500); 4 Feb 2014 17:54:52 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 25431 invoked by uid 99); 4 Feb 2014 17:54:33 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Feb 2014 17:54:33 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 660E482416B; Tue, 4 Feb 2014 17:54:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Tue, 04 Feb 2014 17:55:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/48] Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java index 0f73ae4,0000000..7a61eb6 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java @@@ -1,317 -1,0 +1,318 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tables; + +import java.security.SecurityPermission; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.util.TablePropUtil; +import org.apache.accumulo.server.zookeeper.ZooCache; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; + +public class TableManager { + private static SecurityPermission TABLE_MANAGER_PERMISSION = new SecurityPermission("tableManagerPermission"); + + private static final Logger log = Logger.getLogger(TableManager.class); + private static final Set observers = Collections.synchronizedSet(new HashSet()); + private static final Map tableStateCache = Collections.synchronizedMap(new HashMap()); - ++ private static final byte[] ZERO_BYTE = new byte[] {'0'}; ++ + private static TableManager tableManager = null; + + private final Instance instance; + private ZooCache zooStateCache; + + public static void prepareNewNamespaceState(String instanceId, String namespaceId, String namespace, NodeExistsPolicy existsPolicy) throws KeeperException, + InterruptedException { + log.debug("Creating ZooKeeper entries for new namespace " + namespace + " (ID: " + namespaceId + ")"); + String zPath = Constants.ZROOT + "/" + instanceId + Constants.ZNAMESPACES + "/" + namespaceId; + + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + zoo.putPersistentData(zPath, new byte[0], existsPolicy); + zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(Constants.UTF8), existsPolicy); + zoo.putPersistentData(zPath + Constants.ZNAMESPACE_CONF, new byte[0], existsPolicy); + } + + public static void prepareNewTableState(String instanceId, String tableId, String namespaceId, String tableName, TableState state, + NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException { + // state gets created last + log.debug("Creating ZooKeeper entries for new table " + tableName + " (ID: " + tableId + ") in namespace (ID: " + namespaceId + ")"); + Pair qualifiedTableName = Tables.qualify(tableName); + tableName = qualifiedTableName.getSecond(); + String zTablePath = Constants.ZROOT + "/" + instanceId + Constants.ZTABLES + "/" + tableId; + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + zoo.putPersistentData(zTablePath, new byte[0], existsPolicy); + zoo.putPersistentData(zTablePath + Constants.ZTABLE_CONF, new byte[0], existsPolicy); + zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAMESPACE, namespaceId.getBytes(Constants.UTF8), existsPolicy); + zoo.putPersistentData(zTablePath + Constants.ZTABLE_NAME, tableName.getBytes(Constants.UTF8), existsPolicy); - zoo.putPersistentData(zTablePath + Constants.ZTABLE_FLUSH_ID, "0".getBytes(Constants.UTF8), existsPolicy); - zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_ID, "0".getBytes(Constants.UTF8), existsPolicy); - zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, "0".getBytes(Constants.UTF8), existsPolicy); ++ zoo.putPersistentData(zTablePath + Constants.ZTABLE_FLUSH_ID, ZERO_BYTE, existsPolicy); ++ zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_ID, ZERO_BYTE, existsPolicy); ++ zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, ZERO_BYTE, existsPolicy); + zoo.putPersistentData(zTablePath + Constants.ZTABLE_STATE, state.name().getBytes(Constants.UTF8), existsPolicy); + } + + public synchronized static TableManager getInstance() { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(TABLE_MANAGER_PERMISSION); + } + if (tableManager == null) + tableManager = new TableManager(); + return tableManager; + } + + private TableManager() { + instance = HdfsZooInstance.getInstance(); + zooStateCache = new ZooCache(new TableStateWatcher()); + updateTableStateCache(); + } + + public TableState getTableState(String tableId) { + return tableStateCache.get(tableId); + } + + public static class IllegalTableTransitionException extends Exception { + private static final long serialVersionUID = 1L; + + final TableState oldState; + final TableState newState; + + public IllegalTableTransitionException(TableState oldState, TableState newState) { + this.oldState = oldState; + this.newState = newState; + } + + public TableState getOldState() { + return oldState; + } + + public TableState getNewState() { + return newState; + } + + } + + public synchronized void transitionTableState(final String tableId, final TableState newState) { + String statePath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE; + + try { - ZooReaderWriter.getRetryingInstance().mutate(statePath, newState.name().getBytes(), ZooUtil.PUBLIC, new Mutator() { ++ ZooReaderWriter.getRetryingInstance().mutate(statePath, newState.name().getBytes(Constants.UTF8), ZooUtil.PUBLIC, new Mutator() { + @Override + public byte[] mutate(byte[] oldData) throws Exception { + TableState oldState = TableState.UNKNOWN; + if (oldData != null) - oldState = TableState.valueOf(new String(oldData)); ++ oldState = TableState.valueOf(new String(oldData, Constants.UTF8)); + boolean transition = true; + // +--------+ + // v | + // NEW -> (ONLINE|OFFLINE)+--- DELETING + switch (oldState) { + case NEW: + transition = (newState == TableState.OFFLINE || newState == TableState.ONLINE); + break; + case ONLINE: // fall-through intended + case UNKNOWN:// fall through intended + case OFFLINE: + transition = (newState != TableState.NEW); + break; + case DELETING: + // Can't transition to any state from DELETING + transition = false; + break; + } + if (!transition) + throw new IllegalTableTransitionException(oldState, newState); + log.debug("Transitioning state for table " + tableId + " from " + oldState + " to " + newState); - return newState.name().getBytes(); ++ return newState.name().getBytes(Constants.UTF8); + } + }); + } catch (Exception e) { + log.fatal("Failed to transition table to state " + newState); + throw new RuntimeException(e); + } + } + + private void updateTableStateCache() { + synchronized (tableStateCache) { + for (String tableId : zooStateCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES)) + if (zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE) != null) + updateTableStateCache(tableId); + } + } + + public TableState updateTableStateCache(String tableId) { + synchronized (tableStateCache) { + TableState tState = TableState.UNKNOWN; + byte[] data = zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE); + if (data != null) { - String sState = new String(data); ++ String sState = new String(data, Constants.UTF8); + try { + tState = TableState.valueOf(sState); + } catch (IllegalArgumentException e) { + log.error("Unrecognized state for table with tableId=" + tableId + ": " + sState); + } + tableStateCache.put(tableId, tState); + } + return tState; + } + } + + public void addTable(String tableId, String namespaceId, String tableName, NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException, + NamespaceNotFoundException { + prepareNewTableState(instance.getInstanceID(), tableId, namespaceId, tableName, TableState.NEW, existsPolicy); + updateTableStateCache(tableId); + } + + public void cloneTable(String srcTable, String tableId, String tableName, String namespaceId, Map propertiesToSet, + Set propertiesToExclude, NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException { + prepareNewTableState(instance.getInstanceID(), tableId, namespaceId, tableName, TableState.NEW, existsPolicy); + + String srcTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + srcTable + Constants.ZTABLE_CONF; + String newTablePath = Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF; + ZooReaderWriter.getRetryingInstance().recursiveCopyPersistent(srcTablePath, newTablePath, NodeExistsPolicy.OVERWRITE); + + for (Entry entry : propertiesToSet.entrySet()) + TablePropUtil.setTableProperty(tableId, entry.getKey(), entry.getValue()); + + for (String prop : propertiesToExclude) + ZooReaderWriter.getRetryingInstance().recursiveDelete( + Constants.ZROOT + "/" + instance.getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF + "/" + prop, NodeMissingPolicy.SKIP); + + updateTableStateCache(tableId); + } + + public void removeTable(String tableId) throws KeeperException, InterruptedException { + synchronized (tableStateCache) { + tableStateCache.remove(tableId); + ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE, + NodeMissingPolicy.SKIP); + ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP); + } + } + + public boolean addObserver(TableObserver to) { + synchronized (observers) { + synchronized (tableStateCache) { + to.initialize(Collections.unmodifiableMap(tableStateCache)); + return observers.add(to); + } + } + } + + public boolean removeObserver(TableObserver to) { + return observers.remove(to); + } + + private class TableStateWatcher implements Watcher { + @Override + public void process(WatchedEvent event) { + if (log.isTraceEnabled()) + log.trace(event); + + final String zPath = event.getPath(); + final EventType zType = event.getType(); + + String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES; + String tableId = null; + + if (zPath != null && zPath.startsWith(tablesPrefix + "/")) { + String suffix = zPath.substring(tablesPrefix.length() + 1); + if (suffix.contains("/")) { + String[] sa = suffix.split("/", 2); + if (Constants.ZTABLE_STATE.equals("/" + sa[1])) + tableId = sa[0]; + } + if (tableId == null) { + log.warn("Unknown path in " + event); + return; + } + } + + switch (zType) { + case NodeChildrenChanged: + if (zPath != null && zPath.equals(tablesPrefix)) { + updateTableStateCache(); + } else { + log.warn("Unexpected path " + zPath); + } + break; + case NodeCreated: + case NodeDataChanged: + // state transition + TableState tState = updateTableStateCache(tableId); + log.debug("State transition to " + tState + " @ " + event); + synchronized (observers) { + for (TableObserver to : observers) + to.stateChanged(tableId, tState); + } + break; + case NodeDeleted: + if (zPath != null + && tableId != null + && (zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_STATE) || zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_CONF) || zPath + .equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_NAME))) + tableStateCache.remove(tableId); + break; + case None: + switch (event.getState()) { + case Expired: + if (log.isTraceEnabled()) + log.trace("Session expired " + event); + synchronized (observers) { + for (TableObserver to : observers) + to.sessionExpired(); + } + break; + case SyncConnected: + default: + if (log.isTraceEnabled()) + log.trace("Ignored " + event); + } + break; + default: + log.warn("Unandled " + event); + } + } + } + + public void removeNamespace(String namespaceId) throws KeeperException, InterruptedException { + ZooReaderWriter.getRetryingInstance().recursiveDelete(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES + "/" + namespaceId, NodeMissingPolicy.SKIP); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java index 57b9b32,0000000..4ae8335 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/UniqueNameAllocator.java @@@ -1,79 -1,0 +1,79 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.tablets; + +import java.util.Random; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + +/** + * Allocates unique names for an accumulo instance. The names are unique for the lifetime of the instance. + * + * This is useful for filenames because it makes caching easy. + * + */ + +public class UniqueNameAllocator { + private long next = 0; + private long maxAllocated = 0; + private String nextNamePath; + private Random rand; + + private UniqueNameAllocator() { + nextNamePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZNEXT_FILE; + rand = new Random(); + } + + public synchronized String getNextName() { + + while (next >= maxAllocated) { + final int allocate = 100 + rand.nextInt(100); + + try { + byte[] max = ZooReaderWriter.getRetryingInstance().mutate(nextNamePath, null, ZooUtil.PRIVATE, new ZooReaderWriter.Mutator() { + public byte[] mutate(byte[] currentValue) throws Exception { - long l = Long.parseLong(new String(currentValue), Character.MAX_RADIX); ++ long l = Long.parseLong(new String(currentValue, Constants.UTF8), Character.MAX_RADIX); + l += allocate; - return Long.toString(l, Character.MAX_RADIX).getBytes(); ++ return Long.toString(l, Character.MAX_RADIX).getBytes(Constants.UTF8); + } + }); + - maxAllocated = Long.parseLong(new String(max), Character.MAX_RADIX); ++ maxAllocated = Long.parseLong(new String(max, Constants.UTF8), Character.MAX_RADIX); + next = maxAllocated - allocate; + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + - return new String(FastFormat.toZeroPaddedString(next++, 7, Character.MAX_RADIX, new byte[0])); ++ return new String(FastFormat.toZeroPaddedString(next++, 7, Character.MAX_RADIX, new byte[0]), Constants.UTF8); + } + + private static UniqueNameAllocator instance = null; + + public static synchronized UniqueNameAllocator getInstance() { + if (instance == null) + instance = new UniqueNameAllocator(); + + return instance; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java index 813d54c,0000000..cdff21d mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java @@@ -1,135 -1,0 +1,136 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +public class AddFilesWithMissingEntries { + + static final Logger log = Logger.getLogger(AddFilesWithMissingEntries.class); + + public static class Opts extends ClientOpts { + @Parameter(names = "-update", description = "Make changes to the " + MetadataTable.NAME + " table to include missing files") + boolean update = false; + } + + /** + * A utility to add files to the {@value MetadataTable#NAME} table that are not listed in the root tablet. This is a recovery tool for someone who knows what + * they are doing. It might be better to save off files, and recover your instance by re-initializing and importing the existing files. + */ + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(AddFilesWithMissingEntries.class.getName(), args, bwOpts); + + final Scanner scanner = opts.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + final Configuration conf = new Configuration(); + final FileSystem fs = FileSystem.get(conf); + + KeyExtent last = new KeyExtent(); + String directory = null; + Set knownFiles = new HashSet(); + + int count = 0; + final MultiTableBatchWriter writer = opts.getConnector().createMultiTableBatchWriter(bwOpts.getBatchWriterConfig()); + + // collect the list of known files and the directory for each extent + for (Entry entry : scanner) { + Key key = entry.getKey(); + KeyExtent ke = new KeyExtent(key.getRow(), (Text) null); + // when the key extent changes + if (!ke.equals(last)) { + if (directory != null) { + // add any files in the directory unknown to the key extent + count += addUnknownFiles(fs, directory, knownFiles, last, writer, opts.update); + } + directory = null; + knownFiles.clear(); + last = ke; + } + if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { + directory = entry.getValue().toString(); + log.debug("Found directory " + directory + " for row " + key.getRow().toString()); + } else if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + String filename = key.getColumnQualifier().toString(); + knownFiles.add(filename); + log.debug("METADATA file found: " + filename); + } + } + if (directory != null) { + // catch the last key extent + count += addUnknownFiles(fs, directory, knownFiles, last, writer, opts.update); + } + log.info("There were " + count + " files that are unknown to the metadata table"); + writer.close(); + } + + private static int addUnknownFiles(FileSystem fs, String directory, Set knownFiles, KeyExtent ke, MultiTableBatchWriter writer, boolean update) + throws Exception { + int count = 0; + final String tableId = ke.getTableId().toString(); + final Text row = ke.getMetadataEntry(); + log.info(row.toString()); + for (String dir : ServerConstants.getTablesDirs()) { + final Path path = new Path(dir + "/" + tableId + directory); + for (FileStatus file : fs.listStatus(path)) { + if (file.getPath().getName().endsWith("_tmp") || file.getPath().getName().endsWith("_tmp.rf")) + continue; + final String filename = directory + "/" + file.getPath().getName(); + if (!knownFiles.contains(filename)) { + count++; + final Mutation m = new Mutation(row); + String size = Long.toString(file.getLen()); + String entries = "1"; // lie + String value = size + "," + entries; - m.put(DataFileColumnFamily.NAME, new Text(filename), new Value(value.getBytes())); ++ m.put(DataFileColumnFamily.NAME, new Text(filename), new Value(value.getBytes(Constants.UTF8))); + if (update) { + writer.getBatchWriter(MetadataTable.NAME).addMutation(m); + } + } + } + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java index 0fd1c78,0000000..ac13034 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ChangeSecret.java @@@ -1,154 -1,0 +1,155 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import com.beust.jcommander.Parameter; + +public class ChangeSecret { + + static class Opts extends ClientOpts { + @Parameter(names="--old", description="old zookeeper password", password=true, hidden=true) + String oldPass; + @Parameter(names="--new", description="new zookeeper password", password=true, hidden=true) + String newPass; + } + + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + List argsList = new ArrayList(args.length + 2); + argsList.add("--old"); + argsList.add("--new"); + argsList.addAll(Arrays.asList(args)); + opts.parseArgs(ChangeSecret.class.getName(), argsList.toArray(new String[0])); + FileSystem fs = FileSystem.get(CachedConfiguration.getInstance()); + Instance inst = opts.getInstance(); + if (!verifyAccumuloIsDown(inst, opts.oldPass)) + System.exit(-1); + String instanceId = rewriteZooKeeperInstance(inst, opts.oldPass, opts.newPass); + updateHdfs(fs, inst, instanceId); + if (opts.oldPass != null) { + deleteInstance(inst, opts.oldPass); + } + System.out.println("New instance id is " + instanceId); + System.out.println("Be sure to put your new secret in accumulo-site.xml"); + } + + interface Visitor { + void visit(ZooReader zoo, String path) throws Exception; + } + + private static void recurse(ZooReader zoo, String root, Visitor v) { + try { + v.visit(zoo, root); + for (String child : zoo.getChildren(root)) { + recurse(zoo, root + "/" + child, v); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private static boolean verifyAccumuloIsDown(Instance inst, String oldPassword) { + ZooReader zooReader = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), oldPassword); + String root = ZooUtil.getRoot(inst); + final List ephemerals = new ArrayList(); + recurse(zooReader, root, new Visitor() { + public void visit(ZooReader zoo, String path) throws Exception { + Stat stat = zoo.getStatus(path); + if (stat.getEphemeralOwner() != 0) + ephemerals.add(path); + } + }); + if (ephemerals.size() == 0) { + return true; + } + + System.err.println("The following ephemeral nodes exist, something is still running:"); + for (String path : ephemerals) { + System.err.println(path); + } + return false; + } + + private static String rewriteZooKeeperInstance(final Instance inst, String oldPass, String newPass) throws Exception { + final ZooReaderWriter orig = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), oldPass); + final IZooReaderWriter new_ = new ZooReaderWriter(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), newPass); + final String newInstanceId = UUID.randomUUID().toString(); + String root = ZooUtil.getRoot(inst); + recurse(orig, root, new Visitor() { + public void visit(ZooReader zoo, String path) throws Exception { + String newPath = path.replace(inst.getInstanceID(), newInstanceId); + byte[] data = zoo.getData(path, null); + List acls = orig.getZooKeeper().getACL(path, new Stat()); + if (acls.containsAll(Ids.READ_ACL_UNSAFE)) { + new_.putPersistentData(newPath, data, NodeExistsPolicy.FAIL); + } else { + // upgrade + if (acls.containsAll(Ids.OPEN_ACL_UNSAFE)) { + // make user nodes private, they contain the user's password + String parts[] = path.split("/"); + if (parts[parts.length - 2].equals("users")) { + new_.putPrivatePersistentData(newPath, data, NodeExistsPolicy.FAIL); + } else { + // everything else can have the readable acl + new_.putPersistentData(newPath, data, NodeExistsPolicy.FAIL); + } + } else { + new_.putPrivatePersistentData(newPath, data, NodeExistsPolicy.FAIL); + } + } + } + }); + String path = "/accumulo/instances/" + inst.getInstanceName(); + orig.recursiveDelete(path, NodeMissingPolicy.SKIP); - new_.putPersistentData(path, newInstanceId.getBytes(), NodeExistsPolicy.OVERWRITE); ++ new_.putPersistentData(path, newInstanceId.getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE); + return newInstanceId; + } + + private static void updateHdfs(FileSystem fs, Instance inst, String newInstanceId) throws IOException { + fs.delete(ServerConstants.getInstanceIdLocation(), true); + fs.mkdirs(ServerConstants.getInstanceIdLocation()); + fs.create(new Path(ServerConstants.getInstanceIdLocation(), newInstanceId)).close(); + } + + private static void deleteInstance(Instance origInstance, String oldPass) throws Exception { + IZooReaderWriter orig = new ZooReaderWriter(origInstance.getZooKeepers(), origInstance.getZooKeepersSessionTimeOut(), oldPass); + orig.recursiveDelete("/accumulo/" + origInstance.getInstanceID(), NodeMissingPolicy.SKIP); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java index b7a90d3,0000000..f2074a1 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/CleanZookeeper.java @@@ -1,87 -1,0 +1,87 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.IOException; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +import com.beust.jcommander.Parameter; + +public class CleanZookeeper { + + private static final Logger log = Logger.getLogger(CleanZookeeper.class); + + static class Opts extends Help { + @Parameter(names={"-z", "--keepers"}, description="comma separated list of zookeeper hosts") + String keepers = "localhost:2181"; + @Parameter(names={"--password"}, description="the system secret", password=true) + String auth; + } + + /** + * @param args + * must contain one element: the address of a zookeeper node a second parameter provides an additional authentication value + * @throws IOException + * error connecting to accumulo or zookeeper + */ + public static void main(String[] args) throws IOException { + Opts opts = new Opts(); + opts.parseArgs(CleanZookeeper.class.getName(), args); + + String root = Constants.ZROOT; + IZooReaderWriter zk = ZooReaderWriter.getInstance(); + if (opts.auth != null) { - zk.getZooKeeper().addAuthInfo("digest", ("accumulo:"+opts.auth).getBytes()); ++ zk.getZooKeeper().addAuthInfo("digest", ("accumulo:"+opts.auth).getBytes(Constants.UTF8)); + } + + try { + for (String child : zk.getChildren(root)) { + if (Constants.ZINSTANCES.equals("/" + child)) { + for (String instanceName : zk.getChildren(root + Constants.ZINSTANCES)) { + String instanceNamePath = root + Constants.ZINSTANCES + "/" + instanceName; + byte[] id = zk.getData(instanceNamePath, null); - if (id != null && !new String(id).equals(HdfsZooInstance.getInstance().getInstanceID())) { ++ if (id != null && !new String(id, Constants.UTF8).equals(HdfsZooInstance.getInstance().getInstanceID())) { + try { + zk.recursiveDelete(instanceNamePath, NodeMissingPolicy.SKIP); + } catch (KeeperException.NoAuthException ex) { + log.warn("Unable to delete " + instanceNamePath); + } + } + } + } else if (!child.equals(HdfsZooInstance.getInstance().getInstanceID())) { + String path = root + "/" + child; + try { + zk.recursiveDelete(path, NodeMissingPolicy.SKIP); + } catch (KeeperException.NoAuthException ex) { + log.warn("Unable to delete " + path); + } + } + } + } catch (Exception ex) { + System.out.println("Error Occurred: " + ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java index a74f2b5,0000000..448da86 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/DeleteZooInstance.java @@@ -1,81 -1,0 +1,81 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.zookeeper.KeeperException; + +import com.beust.jcommander.Parameter; + +public class DeleteZooInstance { + + static class Opts extends Help { + @Parameter(names={"-i", "--instance"}, description="the instance name or id to delete") + String instance; + } + + static void deleteRetry(IZooReaderWriter zk, String path) throws Exception { + for (int i = 0; i < 10; i++){ + try { + zk.recursiveDelete(path, NodeMissingPolicy.SKIP); + return; + } catch (KeeperException.NotEmptyException ex) { + // ignored + } catch (Exception ex) { + throw ex; + } + } + } + + /** + * @param args + * : the name or UUID of the instance to be deleted + */ + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(DeleteZooInstance.class.getName(), args); + + IZooReaderWriter zk = ZooReaderWriter.getInstance(); + // try instance name: + Set instances = new HashSet(zk.getChildren(Constants.ZROOT + Constants.ZINSTANCES)); + Set uuids = new HashSet(zk.getChildren(Constants.ZROOT)); + uuids.remove("instances"); + if (instances.contains(opts.instance)) { + String path = Constants.ZROOT + Constants.ZINSTANCES + "/" + opts.instance; + byte[] data = zk.getData(path, null); + deleteRetry(zk, path); - deleteRetry(zk, Constants.ZROOT + "/" + new String(data)); ++ deleteRetry(zk, Constants.ZROOT + "/" + new String(data, Constants.UTF8)); + } else if (uuids.contains(opts.instance)) { + // look for the real instance name + for (String instance : instances) { + String path = Constants.ZROOT + Constants.ZINSTANCES + "/" + instance; + byte[] data = zk.getData(path, null); - if (opts.instance.equals(new String(data))) ++ if (opts.instance.equals(new String(data, Constants.UTF8))) + deleteRetry(zk, path); + } + deleteRetry(zk, Constants.ZROOT + "/" + opts.instance); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index b982829,0000000..f4655dc mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@@ -1,225 -1,0 +1,225 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.Formattable; +import java.util.Formatter; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +public class ListInstances { + + private static final Logger log = Logger.getLogger(ListInstances.class); + + private static final int NAME_WIDTH = 20; + private static final int UUID_WIDTH = 37; + private static final int MASTER_WIDTH = 30; + + private static final int ZOOKEEPER_TIMER_MILLIS = 30 * 1000; + + static class Opts extends Help { + @Parameter(names="--print-errors", description="display errors while listing instances") + boolean printErrors = false; + @Parameter(names="--print-all", description="print information for all instances, not just those with names") + boolean printAll = false; + @Parameter(names={"-z", "--zookeepers"}, description="the zookeepers to contact") + String keepers = null; + } + static Opts opts = new Opts(); + static int errors = 0; + + public static void main(String[] args) { + opts.parseArgs(ListInstances.class.getName(), args); + + if (opts.keepers == null) { + opts.keepers = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST); + } + + String keepers = opts.keepers; + boolean printAll = opts.printAll; + boolean printErrors = opts.printErrors; + + listInstances(keepers, printAll, printErrors); + + } + + static synchronized void listInstances(String keepers, boolean printAll, boolean printErrors) { + errors = 0; + + System.out.println("INFO : Using ZooKeepers " + keepers); + ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS); + ZooCache cache = new ZooCache(keepers, ZOOKEEPER_TIMER_MILLIS); + + TreeMap instanceNames = getInstanceNames(rdr, printErrors); + + System.out.println(); + printHeader(); + + for (Entry entry : instanceNames.entrySet()) { + printInstanceInfo(cache, entry.getKey(), entry.getValue(), printErrors); + } + + TreeSet instancedIds = getInstanceIDs(rdr, printErrors); + instancedIds.removeAll(instanceNames.values()); + + if (printAll) { + for (UUID uuid : instancedIds) { + printInstanceInfo(cache, null, uuid, printErrors); + } + } else if (instancedIds.size() > 0) { + System.out.println(); + System.out.println("INFO : " + instancedIds.size() + " unamed instances were not printed, run with --print-all to see all instances"); + } else { + System.out.println(); + } + + if (!printErrors && errors > 0) { + System.err.println("WARN : There were " + errors + " errors, run with --print-errors to see more info"); + } + } + + private static class CharFiller implements Formattable { + + char c; + + CharFiller(char c) { + this.c = c; + } + + @Override + public void formatTo(Formatter formatter, int flags, int width, int precision) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < width; i++) + sb.append(c); + formatter.format(sb.toString()); + } + + } + + private static void printHeader() { + System.out.printf(" %-" + NAME_WIDTH + "s| %-" + UUID_WIDTH + "s| %-" + MASTER_WIDTH + "s%n", "Instance Name", "Instance ID", "Master"); + System.out.printf("%" + (NAME_WIDTH + 1) + "s+%" + (UUID_WIDTH + 1) + "s+%" + (MASTER_WIDTH + 1) + "s%n", new CharFiller('-'), new CharFiller('-'), + new CharFiller('-')); + + } + + private static void printInstanceInfo(ZooCache cache, String instanceName, UUID iid, boolean printErrors) { + String master = getMaster(cache, iid, printErrors); + if (instanceName == null) { + instanceName = ""; + } + + if (master == null) { + master = ""; + } + + System.out.printf("%" + NAME_WIDTH + "s |%" + UUID_WIDTH + "s |%" + MASTER_WIDTH + "s%n", "\"" + instanceName + "\"", iid, master); + } + + private static String getMaster(ZooCache cache, UUID iid, boolean printErrors) { + + if (iid == null) { + return null; + } + + try { + String masterLocPath = Constants.ZROOT + "/" + iid + Constants.ZMASTER_LOCK; + byte[] master = ZooLock.getLockData(cache, masterLocPath, null); + if (master == null) { + return null; + } - return new String(master); ++ return new String(master, Constants.UTF8); + } catch (Exception e) { + handleException(e, printErrors); + return null; + } + } + + private static TreeMap getInstanceNames(ZooReader zk, boolean printErrors) { + + String instancesPath = Constants.ZROOT + Constants.ZINSTANCES; + + TreeMap tm = new TreeMap(); + + List names; + + try { + names = zk.getChildren(instancesPath); + } catch (Exception e) { + handleException(e, printErrors); + return tm; + } + + for (String name : names) { + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name; + try { - UUID iid = UUID.fromString(new String(zk.getData(instanceNamePath, null))); ++ UUID iid = UUID.fromString(new String(zk.getData(instanceNamePath, null), Constants.UTF8)); + tm.put(name, iid); + } catch (Exception e) { + handleException(e, printErrors); + tm.put(name, null); + } + } + + return tm; + } + + private static TreeSet getInstanceIDs(ZooReader zk, boolean printErrors) { + TreeSet ts = new TreeSet(); + + try { + List children = zk.getChildren(Constants.ZROOT); + + for (String iid : children) { + if (iid.equals("instances")) + continue; + try { + ts.add(UUID.fromString(iid)); + } catch (Exception e) { + log.error("Exception: " + e); + } + } + } catch (Exception e) { + handleException(e, printErrors); + } + + return ts; + } + + private static void handleException(Exception e, boolean printErrors) { + if (printErrors) { + e.printStackTrace(); + } + + errors++; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java index 112a619,0000000..a96e791 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java @@@ -1,111 -1,0 +1,113 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.net.HostAndPort; + +public class LocalityCheck { + + public int run(String[] args) throws Exception { + ClientOpts opts = new ClientOpts(); + opts.parseArgs(LocalityCheck.class.getName(), args); + + VolumeManager fs = VolumeManagerImpl.get(); + Connector connector = opts.getConnector(); + Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + + Map totalBlocks = new HashMap(); + Map localBlocks = new HashMap(); + ArrayList files = new ArrayList(); + + for (Entry entry : scanner) { + Key key = entry.getKey(); + if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { + String location = entry.getValue().toString(); + String[] parts = location.split(":"); + String host = parts[0]; + addBlocks(fs, host, files, totalBlocks, localBlocks); + files.clear(); + } else if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + + files.add(fs.getFullPath(key).toString()); + } + } + System.out.println(" Server %local total blocks"); - for (String host : totalBlocks.keySet()) { - System.out.println(String.format("%15s %5.1f %8d", host, (localBlocks.get(host) * 100.) / totalBlocks.get(host), totalBlocks.get(host))); ++ for (Entry entry : totalBlocks.entrySet()) { ++ final String host = entry.getKey(); ++ final Long blocksForHost = entry.getValue(); ++ System.out.println(String.format("%15s %5.1f %8d", host, (localBlocks.get(host) * 100.) / blocksForHost, blocksForHost)); + } + return 0; + } + + private void addBlocks(VolumeManager fs, String host, ArrayList files, Map totalBlocks, Map localBlocks) throws Exception { + long allBlocks = 0; + long matchingBlocks = 0; + if (!totalBlocks.containsKey(host)) { + totalBlocks.put(host, 0L); + localBlocks.put(host, 0L); + } + for (String file : files) { + Path filePath = new Path(file); + FileSystem ns = fs.getFileSystemByPath(filePath); + FileStatus fileStatus = ns.getFileStatus(filePath); + BlockLocation[] fileBlockLocations = ns.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + for (BlockLocation blockLocation : fileBlockLocations) { + allBlocks++; + for (String location : blockLocation.getHosts()) { + HostAndPort hap = HostAndPort.fromParts(location, 0); + if (hap.getHostText().equals(host)) { + matchingBlocks++; + break; + } + } + } + } + totalBlocks.put(host, allBlocks + totalBlocks.get(host)); + localBlocks.put(host, matchingBlocks + localBlocks.get(host)); + } + + public static void main(String[] args) throws Exception { + LocalityCheck check = new LocalityCheck(); + System.exit(check.run(args)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java index d80adc9,0000000..1f61b43 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/SendLogToChainsaw.java @@@ -1,276 -1,0 +1,278 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.BufferedReader; +import java.io.File; ++import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FilenameFilter; +import java.io.IOException; ++import java.io.InputStreamReader; +import java.io.UnsupportedEncodingException; +import java.net.Socket; +import java.net.URLEncoder; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.net.SocketFactory; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.lang.math.LongRange; +import org.apache.log4j.Category; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LocationInfo; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.log4j.spi.ThrowableInformation; +import org.apache.log4j.varia.LevelRangeFilter; +import org.apache.log4j.xml.XMLLayout; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + +public class SendLogToChainsaw extends XMLLayout { + + private static Pattern logPattern = Pattern.compile( + "^(\\d\\d)\\s(\\d\\d):(\\d\\d):(\\d\\d),(\\d\\d\\d)\\s\\[(.*)\\]\\s(TRACE|DEBUG|INFO|WARN|FATAL|ERROR)\\s*?:(.*)$", Pattern.UNIX_LINES); + + private File[] logFiles = null; + + private SocketFactory factory = SocketFactory.getDefault(); + + private WildcardFileFilter fileFilter = null; + + private Socket socket = null; + + private Pattern lineFilter = null; + + private LongRange dateFilter = null; + + private LevelRangeFilter levelFilter = null; + + public SendLogToChainsaw(String directory, String fileNameFilter, String host, int port, Date start, Date end, String regex, String level) throws Exception { + + // Set up the file name filter + if (null != fileNameFilter) { + fileFilter = new WildcardFileFilter(fileNameFilter); + } else { + fileFilter = new WildcardFileFilter("*"); + } + + // Get the list of files that match + File dir = new File(directory); + if (dir.isDirectory()) { + logFiles = dir.listFiles((FilenameFilter) fileFilter); + } else { + throw new IllegalArgumentException(directory + " is not a directory or is not readable."); + } + + if (logFiles.length == 0) { + throw new IllegalArgumentException("No files match the supplied filter."); + } + + socket = factory.createSocket(host, port); + + lineFilter = Pattern.compile(regex); + + // Create Date Filter + if (null != start) { + if (end == null) + end = new Date(System.currentTimeMillis()); + dateFilter = new LongRange(start.getTime(), end.getTime()); + } + + if (null != level) { + Level base = Level.toLevel(level.toUpperCase()); + levelFilter = new LevelRangeFilter(); + levelFilter.setAcceptOnMatch(true); + levelFilter.setLevelMin(base); + levelFilter.setLevelMax(Level.FATAL); + } + } + + public void processLogFiles() throws IOException { + String line = null; + String out = null; - FileReader fReader = null; ++ InputStreamReader isReader = null; + BufferedReader reader = null; + try { + for (File log : logFiles) { + // Parse the server type and name from the log file name + String threadName = log.getName().substring(0, log.getName().indexOf(".")); + try { - fReader = new FileReader(log); ++ isReader = new InputStreamReader(new FileInputStream(log), Constants.UTF8); + } catch (FileNotFoundException e) { + System.out.println("Unable to find file: " + log.getAbsolutePath()); + throw e; + } - reader = new BufferedReader(fReader); ++ reader = new BufferedReader(isReader); + + try { + line = reader.readLine(); + while (null != line) { + out = convertLine(line, threadName); + if (null != out) { + if (socket != null && socket.isConnected()) - socket.getOutputStream().write(out.getBytes()); ++ socket.getOutputStream().write(out.getBytes(Constants.UTF8)); + else + System.err.println("Unable to send data to transport"); + } + line = reader.readLine(); + } + } catch (IOException e) { + System.out.println("Error processing line: " + line + ". Output was " + out); + throw e; + } finally { + if (reader != null) { + reader.close(); + } - if (fReader != null) { - fReader.close(); ++ if (isReader != null) { ++ isReader.close(); + } + } + } + } finally { + if (socket != null && socket.isConnected()) { + socket.close(); + } + } + } + + private String convertLine(String line, String threadName) throws UnsupportedEncodingException { + String result = null; + Matcher m = logPattern.matcher(line); + if (m.matches()) { + + Calendar cal = Calendar.getInstance(); + cal.setTime(new Date(System.currentTimeMillis())); + Integer date = Integer.parseInt(m.group(1)); + Integer hour = Integer.parseInt(m.group(2)); + Integer min = Integer.parseInt(m.group(3)); + Integer sec = Integer.parseInt(m.group(4)); + Integer ms = Integer.parseInt(m.group(5)); + String clazz = m.group(6); + String level = m.group(7); + String message = m.group(8); + // Apply the regex filter if supplied + if (null != lineFilter) { + Matcher match = lineFilter.matcher(message); + if (!match.matches()) + return null; + } + // URL encode the message + message = URLEncoder.encode(message, Constants.UTF8.name()); + // Assume that we are processing logs from today. + // If the date in the line is greater than today, then it must be + // from the previous month. + cal.set(Calendar.DATE, date); + cal.set(Calendar.HOUR_OF_DAY, hour); + cal.set(Calendar.MINUTE, min); + cal.set(Calendar.SECOND, sec); + cal.set(Calendar.MILLISECOND, ms); + if (date > cal.get(Calendar.DAY_OF_MONTH)) { + cal.add(Calendar.MONTH, -1); + } + long ts = cal.getTimeInMillis(); + // If this event is not between the start and end dates, then skip it. + if (null != dateFilter && !dateFilter.containsLong(ts)) + return null; + Category c = Logger.getLogger(clazz); + Level l = Level.toLevel(level); + LoggingEvent event = new LoggingEvent(clazz, c, ts, l, message, threadName, (ThrowableInformation) null, (String) null, (LocationInfo) null, + (Map) null); + // Check the log level filter + if (null != levelFilter && (levelFilter.decide(event) == Filter.DENY)) { + return null; + } + result = format(event); + } + return result; + } + + private static class DateConverter implements IStringConverter { + @Override + public Date convert(String value) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); + try { + return formatter.parse(value); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + } + + private static class Opts extends Help { + + @Parameter(names = {"-d", "--logDirectory"}, description = "ACCUMULO log directory path", required = true) + String dir; + + @Parameter(names = {"-f", "--fileFilter"}, description = "filter to apply to names of logs") + String filter; + + @Parameter(names = {"-h", "--host"}, description = "host where chainsaw is running", required = true) + String hostname; + + @Parameter(names = {"-p", "--port"}, description = "port where XMLSocketReceiver is listening", required = true) + int portnum; + + @Parameter(names = {"-s", "--start"}, description = "start date filter (yyyyMMddHHmmss)", required = true, converter = DateConverter.class) + Date startDate; + + @Parameter(names = {"-e", "--end"}, description = "end date filter (yyyyMMddHHmmss)", required = true, converter = DateConverter.class) + Date endDate; + + @Parameter(names = {"-l", "--level"}, description = "filter log level") + String level; + + @Parameter(names = {"-m", "--messageFilter"}, description = "regex filter for log messages") + String regex; + } + + /** + * + * @param args + *
    + *
  1. path to log directory
  2. + *
  3. filter to apply for logs to include (uses wildcards (i.e. logger* and IS case sensitive)
  4. + *
  5. chainsaw host
  6. + *
  7. chainsaw port
  8. + *
  9. start date filter
  10. + *
  11. end date filter
  12. + *
  13. optional regex filter to match on each log4j message
  14. + *
  15. optional level filter
  16. + *
+ */ + public static void main(String[] args) throws Exception { + Opts opts = new Opts(); + opts.parseArgs(SendLogToChainsaw.class.getName(), args); + + SendLogToChainsaw c = new SendLogToChainsaw(opts.dir, opts.filter, opts.hostname, opts.portnum, opts.startDate, opts.endDate, opts.regex, opts.level); + c.processLogFiles(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java index affe12f,0000000..b6ca527 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/SystemPropUtil.java @@@ -1,45 -1,0 +1,45 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.zookeeper.KeeperException; + +public class SystemPropUtil { + public static boolean setSystemProperty(String property, String value) throws KeeperException, InterruptedException { + Property p = Property.getPropertyByKey(property); + if ((p != null && !p.getType().isValidFormat(value)) || !Property.isValidZooPropertyKey(property)) + return false; + + // create the zk node for this property and set it's data to the specified value + String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZCONFIG + "/" + property; - ZooReaderWriter.getInstance().putPersistentData(zPath, value.getBytes(), NodeExistsPolicy.OVERWRITE); ++ ZooReaderWriter.getInstance().putPersistentData(zPath, value.getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE); + + return true; + } + + public static void removeSystemProperty(String property) throws InterruptedException, KeeperException { + String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZCONFIG + "/" + property; + ZooReaderWriter.getInstance().recursiveDelete(zPath, NodeMissingPolicy.FAIL); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java index dfb05d0,0000000..2fc0bd3 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java @@@ -1,75 -1,0 +1,75 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.List; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + +import com.beust.jcommander.Parameter; + +public class TabletServerLocks { + + static class Opts extends Help { + @Parameter(names="-list") + boolean list = false; + @Parameter(names="-delete") + String delete = null; + } + /** + * @param args + */ + public static void main(String[] args) throws Exception { + + Instance instance = HdfsZooInstance.getInstance(); + String tserverPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS; + Opts opts = new Opts(); + opts.parseArgs(TabletServerLocks.class.getName(), args); + + ZooCache cache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + + if (opts.list) { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + List tabletServers = zoo.getChildren(tserverPath); + + for (String tabletServer : tabletServers) { + byte[] lockData = ZooLock.getLockData(cache, tserverPath + "/" + tabletServer, null); + String holder = null; + if (lockData != null) { - holder = new String(lockData); ++ holder = new String(lockData, Constants.UTF8); + } + + System.out.printf("%32s %16s%n", tabletServer, holder); + } + } else if (opts.delete != null) { + ZooLock.deleteLock(tserverPath + "/" + args[1]); + } else { + System.out.println("Usage : " + TabletServerLocks.class.getName() + " -list|-delete "); + } + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java index 66e68c3,0000000..b842e09 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java @@@ -1,253 -1,0 +1,253 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.zookeeper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * Provides a way to push work out to tablet servers via zookeeper and wait for that work to be done. Any tablet server can pick up a work item and process it. + * + * Worker processes watch a zookeeper node for tasks to be performed. After getting an exclusive lock on the node, the worker will perform the task. + */ +public class DistributedWorkQueue { + + private static final String LOCKS_NODE = "locks"; + + private static final Logger log = Logger.getLogger(DistributedWorkQueue.class); + + private ThreadPoolExecutor threadPool; + private ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + private String path; + + private AtomicInteger numTask = new AtomicInteger(0); + + private void lookForWork(final Processor processor, List children) { + if (children.size() == 0) + return; + + if (numTask.get() >= threadPool.getCorePoolSize()) + return; + + Random random = new Random(); + Collections.shuffle(children, random); + try { + for (final String child : children) { + + if (child.equals(LOCKS_NODE)) + continue; + + final String lockPath = path + "/locks/" + child; + + try { + // no need to use zoolock, because a queue (ephemeral sequential) is not needed + // if can not get the lock right now then do not want to wait + zoo.putEphemeralData(lockPath, new byte[0]); + } catch (NodeExistsException nee) { + // someone else has reserved it + continue; + } + + final String childPath = path + "/" + child; + + // check to see if another node processed it already + if (!zoo.exists(childPath)) { + zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP); + continue; + } + + // Great... we got the lock, but maybe we're too busy + if (numTask.get() >= threadPool.getCorePoolSize()) { + zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP); + break; + } + + log.debug("got lock for " + child); + + Runnable task = new Runnable() { + + @Override + public void run() { + try { + try { + processor.newProcessor().process(child, zoo.getData(childPath, null)); + + // if the task fails, then its entry in the Q is not deleted... so it will be retried + try { + zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP); + } catch (Exception e) { + log.error("Error received when trying to delete entry in zookeeper " + childPath, e); + } + + } catch (Exception e) { + log.warn("Failed to process work " + child, e); + } + + try { + zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP); + } catch (Exception e) { + log.error("Error received when trying to delete entry in zookeeper " + childPath, e); + } + + } finally { + numTask.decrementAndGet(); + } + + try { + // its important that this is called after numTask is decremented + lookForWork(processor, zoo.getChildren(path)); + } catch (KeeperException e) { + log.error("Failed to look for work", e); + } catch (InterruptedException e) { + log.info("Interrupted looking for work", e); + } + } + }; + + numTask.incrementAndGet(); + threadPool.execute(task); + + } + } catch (Throwable t) { + log.error("Unexpected error", t); + } + } + + public interface Processor { + Processor newProcessor(); + + void process(String workID, byte[] data); + } + + public DistributedWorkQueue(String path) { + this.path = path; + } + + public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) throws KeeperException, InterruptedException { + + threadPool = (ThreadPoolExecutor) executorService; + + zoo.mkdirs(path); + zoo.mkdirs(path + "/" + LOCKS_NODE); + + List children = zoo.getChildren(path, new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case NodeChildrenChanged: + if (event.getPath().equals(path)) + try { + lookForWork(processor, zoo.getChildren(path, this)); + } catch (KeeperException e) { + log.error("Failed to look for work", e); + } catch (InterruptedException e) { + log.info("Interrupted looking for work", e); + } + else + log.info("Unexpected path for NodeChildrenChanged event " + event.getPath()); + break; + case NodeCreated: + case NodeDataChanged: + case NodeDeleted: + case None: + log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path); + break; + + } + } + }); + + lookForWork(processor, children); + + Random r = new Random(); + // Add a little jitter to avoid all the tservers slamming zookeeper at once + SimpleTimer.getInstance().schedule(new Runnable() { + @Override + public void run() { + try { + lookForWork(processor, zoo.getChildren(path)); + } catch (KeeperException e) { + log.error("Failed to look for work", e); + } catch (InterruptedException e) { + log.info("Interrupted looking for work", e); + } + } + }, r.nextInt(60 * 1000), 60 * 1000); + } + + public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException { + if (workId.equalsIgnoreCase(LOCKS_NODE)) + throw new IllegalArgumentException("locks is reserved work id"); + + zoo.mkdirs(path); + zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP); + } + + public List getWorkQueued() throws KeeperException, InterruptedException { + ArrayList children = new ArrayList(zoo.getChildren(path)); + children.remove(LOCKS_NODE); + return children; + } + + public void waitUntilDone(Set workIDs) throws KeeperException, InterruptedException { + - final String condVar = new String("cond"); ++ final String condVar = "cond"; + + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + switch (event.getType()) { + case NodeChildrenChanged: + synchronized (condVar) { + condVar.notify(); + } + break; + case NodeCreated: + case NodeDataChanged: + case NodeDeleted: + case None: + log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path); + break; + + } + } + }; + + List children = zoo.getChildren(path, watcher); + + while (!Collections.disjoint(children, workIDs)) { + synchronized (condVar) { + condVar.wait(10000); + } + children = zoo.getChildren(path, watcher); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/7688eaf0/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java ---------------------------------------------------------------------- diff --cc server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java index 93a0460,0000000..f7c1e68 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/ZooQueueLock.java @@@ -1,53 -1,0 +1,54 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.zookeeper; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + ++import org.apache.accumulo.core.Constants; +import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock; +import org.apache.zookeeper.KeeperException; + +public class ZooQueueLock extends org.apache.accumulo.fate.zookeeper.ZooQueueLock { + + public ZooQueueLock(String path, boolean ephemeral) throws KeeperException, InterruptedException { + super(ZooReaderWriter.getRetryingInstance(), path, ephemeral); + } + + public static void main(String args[]) throws InterruptedException, KeeperException { + ZooQueueLock lock = new ZooQueueLock("/lock", true); - DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes()); - DistributedReadWriteLock wlocker = new DistributedReadWriteLock(lock, "wlocker".getBytes()); ++ DistributedReadWriteLock rlocker = new DistributedReadWriteLock(lock, "reader".getBytes(Constants.UTF8)); ++ DistributedReadWriteLock wlocker = new DistributedReadWriteLock(lock, "wlocker".getBytes(Constants.UTF8)); + final Lock readLock = rlocker.readLock(); + readLock.lock(); + final Lock readLock2 = rlocker.readLock(); + readLock2.lock(); + final Lock writeLock = wlocker.writeLock(); + if (writeLock.tryLock(100, TimeUnit.MILLISECONDS)) + throw new RuntimeException("Write lock achieved during read lock!"); + readLock.unlock(); + readLock2.unlock(); + writeLock.lock(); + if (readLock.tryLock(100, TimeUnit.MILLISECONDS)) + throw new RuntimeException("Read lock achieved during write lock!"); - final Lock writeLock2 = DistributedReadWriteLock.recoverLock(lock, "wlocker".getBytes()); ++ final Lock writeLock2 = DistributedReadWriteLock.recoverLock(lock, "wlocker".getBytes(Constants.UTF8)); + writeLock2.unlock(); + readLock.lock(); + System.out.println("success"); + } + +}