Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 81BB8106C3 for ; Fri, 1 Nov 2013 00:55:52 +0000 (UTC) Received: (qmail 68021 invoked by uid 500); 1 Nov 2013 00:55:48 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 67632 invoked by uid 500); 1 Nov 2013 00:55:45 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 66785 invoked by uid 99); 1 Nov 2013 00:55:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Nov 2013 00:55:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 51CE06F32; Fri, 1 Nov 2013 00:55:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 01 Nov 2013 00:56:22 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [43/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java new file mode 100644 index 0000000..665bf25 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class FileSystemMonitor { + private static final String PROC_MOUNTS = "/proc/mounts"; + private static final Logger log = Logger.getLogger(FileSystemMonitor.class); + + private static class Mount { + String mountPoint; + Set options; + + Mount(String line) { + String tokens[] = line.split("\\s+"); + + mountPoint = tokens[1]; + + options = new HashSet(Arrays.asList(tokens[3].split(","))); + } + } + + static List parse(String procFile) throws IOException { + + List mounts = new ArrayList(); + + FileReader fr = new FileReader(procFile); + BufferedReader br = new BufferedReader(fr); + + String line; + try { + while ((line = br.readLine()) != null) + mounts.add(new Mount(line)); + } finally { + br.close(); + } + + return mounts; + } + + private Map readWriteFilesystems = new HashMap(); + + public FileSystemMonitor(final String procFile, long period) throws IOException { + List mounts = parse(procFile); + + for (Mount mount : mounts) { + if (mount.options.contains("rw")) + readWriteFilesystems.put(mount.mountPoint, true); + else if (mount.options.contains("ro")) + readWriteFilesystems.put(mount.mountPoint, false); + else + throw new IOException("Filesystem " + mount + " does not have ro or rw option"); + } + + TimerTask tt = new TimerTask() { + @Override + public void run() { + try { + checkMounts(procFile); + } catch (final Exception e) { + Halt.halt(-42, new Runnable() { + public void run() { + log.fatal("Exception while checking mount points, halting process", e); + } + }); + } + } + }; + + // use a new Timer object instead of a shared one. + // trying to avoid the case where one the timers other + // task gets stuck because a FS went read only, and this task + // does not execute + Timer timer = new Timer("filesystem monitor timer", true); + timer.schedule(tt, period, period); + + } + + protected void logAsync(final Level level, final String msg, final Exception e) { + Runnable r = new Runnable() { + @Override + public void run() { + log.log(level, msg, e); + } + }; + + new Thread(r).start(); + } + + protected void checkMounts(String procFile) throws Exception { + List mounts = parse(procFile); + + for (Mount mount : mounts) { + if (!readWriteFilesystems.containsKey(mount.mountPoint)) + if (mount.options.contains("rw")) + readWriteFilesystems.put(mount.mountPoint, true); + else if (mount.options.contains("ro")) + readWriteFilesystems.put(mount.mountPoint, false); + else + throw new Exception("Filesystem " + mount + " does not have ro or rw option"); + else if (mount.options.contains("ro") && readWriteFilesystems.get(mount.mountPoint)) + throw new Exception("Filesystem " + mount.mountPoint + " switched to read only"); + } + } + + public static void start(AccumuloConfiguration conf, Property prop) { + if (conf.getBoolean(prop)) { + if (new File(PROC_MOUNTS).exists()) { + try { + new FileSystemMonitor(PROC_MOUNTS, 60000); + log.info("Filesystem monitor started"); + } catch (IOException e) { + log.error("Failed to initialize file system monitor", e); + } + } else { + log.info("Not monitoring filesystems, " + PROC_MOUNTS + " does not exists"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java new file mode 100644 index 0000000..fa13f1b --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.file.rfile.RFileOperations; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +public class FileUtil { + + public static class FileInfo { + Key firstKey = new Key(); + Key lastKey = new Key(); + + public FileInfo(Key firstKey, Key lastKey) { + this.firstKey = firstKey; + this.lastKey = lastKey; + } + + public Text getFirstRow() { + return firstKey.getRow(); + } + + public Text getLastRow() { + return lastKey.getRow(); + } + } + + private static final Logger log = Logger.getLogger(FileUtil.class); + + private static String createTmpDir(AccumuloConfiguration acuConf, VolumeManager fs) throws IOException { + String accumuloDir = acuConf.get(Property.INSTANCE_DFS_DIR); + + String tmpDir = null; + while (tmpDir == null) { + tmpDir = accumuloDir + "/tmp/idxReduce_" + String.format("%09d", (int) (Math.random() * Integer.MAX_VALUE)); + + try { + fs.getFileStatus(new Path(tmpDir)); + tmpDir = null; + continue; + } catch (FileNotFoundException fne) { + // found an unused temp directory + } + + fs.mkdirs(new Path(tmpDir)); + + // try to reserve the tmp dir + if (!fs.createNewFile(new Path(tmpDir + "/__reserve"))) + tmpDir = null; + } + + return tmpDir; + } + + public static Collection reduceFiles(AccumuloConfiguration acuConf, Configuration conf, VolumeManager fs, Text prevEndRow, Text endRow, + Collection mapFiles, int maxFiles, String tmpDir, int pass) throws IOException { + ArrayList paths = new ArrayList(mapFiles); + + if (paths.size() <= maxFiles) + return paths; + + String newDir = String.format("%s/pass_%04d", tmpDir, pass); + + int start = 0; + + ArrayList outFiles = new ArrayList(); + + int count = 0; + + while (start < paths.size()) { + int end = Math.min(maxFiles + start, paths.size()); + List inFiles = paths.subList(start, end); + + start = end; + + FileRef newMapFile = new FileRef(String.format("%s/%04d." + RFile.EXTENSION, newDir, count++)); + + outFiles.add(newMapFile); + FileSystem ns = fs.getFileSystemByPath(newMapFile.path()); + FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf); + writer.startDefaultLocalityGroup(); + List> iters = new ArrayList>(inFiles.size()); + + FileSKVIterator reader = null; + try { + for (FileRef s : inFiles) { + reader = FileOperations.getInstance().openIndex(s.path().toString(), ns, ns.getConf(), acuConf); + iters.add(reader); + } + + MultiIterator mmfi = new MultiIterator(iters, true); + + while (mmfi.hasTop()) { + Key key = mmfi.getTopKey(); + + boolean gtPrevEndRow = prevEndRow == null || key.compareRow(prevEndRow) > 0; + boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0; + + if (gtPrevEndRow && lteEndRow) + writer.append(key, new Value(new byte[0])); + + if (!lteEndRow) + break; + + mmfi.next(); + } + } finally { + try { + if (reader != null) + reader.close(); + } catch (IOException e) { + log.error(e, e); + } + + for (SortedKeyValueIterator r : iters) + try { + if (r != null) + ((FileSKVIterator) r).close(); + } catch (IOException e) { + // continue closing + log.error(e, e); + } + + try { + if (writer != null) + writer.close(); + } catch (IOException e) { + log.error(e, e); + throw e; + } + } + } + + return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1); + } + + public static SortedMap findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection mapFiles, + double minSplit) throws IOException { + return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true); + } + + public static double estimatePercentageLTE(VolumeManager fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection mapFiles, + Text splitRow) throws IOException { + + Configuration conf = CachedConfiguration.getInstance(); + + String tmpDir = null; + + int maxToOpen = acuconf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); + ArrayList readers = new ArrayList(mapFiles.size()); + + try { + if (mapFiles.size() > maxToOpen) { + tmpDir = createTmpDir(acuconf, fs); + + log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir); + + long t1 = System.currentTimeMillis(); + mapFiles = reduceFiles(acuconf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0); + long t2 = System.currentTimeMillis(); + + log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0)); + } + + if (prevEndRow == null) + prevEndRow = new Text(); + + long numKeys = 0; + + numKeys = countIndexEntries(acuconf, prevEndRow, endRow, mapFiles, true, conf, fs, readers); + + if (numKeys == 0) { + // not enough info in the index to answer the question, so instead of going to + // the data just punt and return .5 + return .5; + } + + List> iters = new ArrayList>(readers); + MultiIterator mmfi = new MultiIterator(iters, true); + + // skip the prevendrow + while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) { + mmfi.next(); + } + + int numLte = 0; + + while (mmfi.hasTop() && mmfi.getTopKey().compareRow(splitRow) <= 0) { + numLte++; + mmfi.next(); + } + + if (numLte > numKeys) { + // something went wrong + throw new RuntimeException("numLte > numKeys " + numLte + " " + numKeys + " " + prevEndRow + " " + endRow + " " + splitRow + " " + mapFiles); + } + + // do not want to return 0% or 100%, so add 1 and 2 below + return (numLte + 1) / (double) (numKeys + 2); + + } finally { + cleanupIndexOp(acuconf, tmpDir, fs, readers); + } + } + + /** + * + * @param mapFiles + * - list MapFiles to find the mid point key + * + * ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an + * accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown. + */ + public static SortedMap findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection mapFiles, + double minSplit, boolean useIndex) throws IOException { + Configuration conf = CachedConfiguration.getInstance(); + + Collection origMapFiles = mapFiles; + + String tmpDir = null; + + int maxToOpen = acuConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN); + ArrayList readers = new ArrayList(mapFiles.size()); + + try { + if (mapFiles.size() > maxToOpen) { + if (!useIndex) + throw new IOException("Cannot find mid point using data files, too many " + mapFiles.size()); + tmpDir = createTmpDir(acuConf, fs); + + log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir); + + long t1 = System.currentTimeMillis(); + mapFiles = reduceFiles(acuConf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0); + long t2 = System.currentTimeMillis(); + + log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0)); + } + + if (prevEndRow == null) + prevEndRow = new Text(); + + long t1 = System.currentTimeMillis(); + + long numKeys = 0; + + numKeys = countIndexEntries(acuConf, prevEndRow, endRow, mapFiles, tmpDir == null ? useIndex : false, conf, fs, readers); + + if (numKeys == 0) { + if (useIndex) { + log.warn("Failed to find mid point using indexes, falling back to data files which is slower. No entries between " + prevEndRow + " and " + endRow + + " for " + mapFiles); + // need to pass original map files, not possibly reduced indexes + return findMidPoint(fs, acuConf, prevEndRow, endRow, origMapFiles, minSplit, false); + } + throw new IOException("Failed to find mid point, no entries between " + prevEndRow + " and " + endRow + " for " + mapFiles); + } + + List> iters = new ArrayList>(readers); + MultiIterator mmfi = new MultiIterator(iters, true); + + // skip the prevendrow + while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) + mmfi.next(); + + // read half of the keys in the index + TreeMap ret = new TreeMap(); + Key lastKey = null; + long keysRead = 0; + + Key keyBeforeMidPoint = null; + long keyBeforeMidPointPosition = 0; + + while (keysRead < numKeys / 2) { + if (lastKey != null && !lastKey.equals(mmfi.getTopKey(), PartialKey.ROW) && (keysRead - 1) / (double) numKeys >= minSplit) { + keyBeforeMidPoint = new Key(lastKey); + keyBeforeMidPointPosition = keysRead - 1; + } + + if (lastKey == null) + lastKey = new Key(); + + lastKey.set(mmfi.getTopKey()); + + keysRead++; + + // consume minimum + mmfi.next(); + } + + if (keyBeforeMidPoint != null) + ret.put(keyBeforeMidPointPosition / (double) numKeys, keyBeforeMidPoint); + + long t2 = System.currentTimeMillis(); + + log.debug(String.format("Found midPoint from indexes in %6.2f secs.%n", ((t2 - t1) / 1000.0))); + + ret.put(.5, mmfi.getTopKey()); + + // sanity check + for (Key key : ret.values()) { + boolean inRange = (key.compareRow(prevEndRow) > 0 && (endRow == null || key.compareRow(endRow) < 1)); + if (!inRange) { + throw new IOException("Found mid point is not in range " + key + " " + prevEndRow + " " + endRow + " " + mapFiles); + } + } + + return ret; + } finally { + cleanupIndexOp(acuConf, tmpDir, fs, readers); + } + } + + private static void cleanupIndexOp(AccumuloConfiguration acuConf, String tmpDir, VolumeManager fs, ArrayList readers) throws IOException { + // close all of the index sequence files + for (FileSKVIterator r : readers) { + try { + if (r != null) + r.close(); + } catch (IOException e) { + // okay, try to close the rest anyway + log.error(e, e); + } + } + + if (tmpDir != null) { + String tmpPrefix = acuConf.get(Property.INSTANCE_DFS_DIR) + "/tmp"; + if (tmpDir.startsWith(tmpPrefix)) + fs.deleteRecursively(new Path(tmpDir)); + else + log.error("Did not delete tmp dir because it wasn't a tmp dir " + tmpDir); + } + } + + private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection mapFiles, boolean useIndex, + Configuration conf, VolumeManager fs, ArrayList readers) throws IOException { + + long numKeys = 0; + + // count the total number of index entries + for (FileRef ref : mapFiles) { + FileSKVIterator reader = null; + Path path = ref.path(); + FileSystem ns = fs.getFileSystemByPath(path); + try { + if (useIndex) + reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf); + else + reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(), + acuConf); + + while (reader.hasTop()) { + Key key = reader.getTopKey(); + if (endRow != null && key.compareRow(endRow) > 0) + break; + else if (prevEndRow == null || key.compareRow(prevEndRow) > 0) + numKeys++; + + reader.next(); + } + } finally { + try { + if (reader != null) + reader.close(); + } catch (IOException e) { + log.error(e, e); + } + } + + if (useIndex) + readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf)); + else + readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(), + acuConf)); + + } + return numKeys; + } + + public static Map tryToGetFirstAndLastRows(VolumeManager fs, AccumuloConfiguration acuConf, Set mapfiles) { + + HashMap mapFilesInfo = new HashMap(); + + long t1 = System.currentTimeMillis(); + + for (FileRef mapfile : mapfiles) { + + FileSKVIterator reader = null; + FileSystem ns = fs.getFileSystemByPath(mapfile.path()); + try { + reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), acuConf); + + Key firstKey = reader.getFirstKey(); + if (firstKey != null) { + mapFilesInfo.put(mapfile, new FileInfo(firstKey, reader.getLastKey())); + } + + } catch (IOException ioe) { + log.warn("Failed to read map file to determine first and last key : " + mapfile, ioe); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException ioe) { + log.warn("failed to close " + mapfile, ioe); + } + } + } + + } + + long t2 = System.currentTimeMillis(); + + log.debug(String.format("Found first and last keys for %d map files in %6.2f secs", mapfiles.size(), (t2 - t1) / 1000.0)); + + return mapFilesInfo; + } + + public static WritableComparable findLastKey(VolumeManager fs, AccumuloConfiguration acuConf, Collection mapFiles) throws IOException { + Key lastKey = null; + + for (FileRef ref : mapFiles) { + Path path = ref.path(); + FileSystem ns = fs.getFileSystemByPath(path); + FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf); + + try { + if (!reader.hasTop()) + // file is empty, so there is no last key + continue; + + Key key = reader.getLastKey(); + + if (lastKey == null || key.compareTo(lastKey) > 0) + lastKey = key; + } finally { + try { + if (reader != null) + reader.close(); + } catch (IOException e) { + log.error(e, e); + } + } + } + + return lastKey; + + } + + private static class MLong { + public MLong(long i) { + l = i; + } + + long l; + } + + public static Map estimateSizes(AccumuloConfiguration acuConf, Path mapFile, long fileSize, List extents, Configuration conf, + VolumeManager fs) throws IOException { + + long totalIndexEntries = 0; + Map counts = new TreeMap(); + for (KeyExtent keyExtent : extents) + counts.put(keyExtent, new MLong(0)); + + Text row = new Text(); + FileSystem ns = fs.getFileSystemByPath(mapFile); + FileSKVIterator index = FileOperations.getInstance().openIndex(mapFile.toString(), ns, ns.getConf(), acuConf); + + try { + while (index.hasTop()) { + Key key = index.getTopKey(); + totalIndexEntries++; + key.getRow(row); + + for (Entry entry : counts.entrySet()) + if (entry.getKey().contains(row)) + entry.getValue().l++; + + index.next(); + } + } finally { + try { + if (index != null) + index.close(); + } catch (IOException e) { + // continue with next file + log.error(e, e); + } + } + + Map results = new TreeMap(); + for (KeyExtent keyExtent : extents) { + double numEntries = counts.get(keyExtent).l; + if (numEntries == 0) + numEntries = 1; + long estSize = (long) ((numEntries / totalIndexEntries) * fileSize); + results.put(keyExtent, estSize); + } + return results; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java new file mode 100644 index 0000000..64c23d0 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.accumulo.server.master.LiveTServerSet; +import org.apache.accumulo.server.master.LiveTServerSet.Listener; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tables.TableManager; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +public class FindOfflineTablets { + private static final Logger log = Logger.getLogger(FindOfflineTablets.class); + + public static void main(String[] args) throws Exception { + ClientOpts opts = new ClientOpts(); + opts.parseArgs(FindOfflineTablets.class.getName(), args); + Instance instance = opts.getInstance(); + SystemCredentials creds = SystemCredentials.get(); + + findOffline(instance, creds, null); + } + + static int findOffline(Instance instance, Credentials creds, String tableName) throws AccumuloException, TableNotFoundException { + + final AtomicBoolean scanning = new AtomicBoolean(false); + + LiveTServerSet tservers = new LiveTServerSet(instance, DefaultConfiguration.getDefaultConfiguration(), new Listener() { + @Override + public void update(LiveTServerSet current, Set deleted, Set added) { + if (!deleted.isEmpty() && scanning.get()) + log.warn("Tablet servers deleted while scanning: " + deleted); + if (!added.isEmpty() && scanning.get()) + log.warn("Tablet servers added while scanning: " + added); + } + }); + tservers.startListeningForTabletServerChanges(); + scanning.set(true); + + Iterator zooScanner; + try { + zooScanner = new ZooTabletStateStore().iterator(); + } catch (DistributedStoreException e) { + throw new AccumuloException(e); + } + + int offline = 0; + + System.out.println("Scanning zookeeper"); + if ((offline = checkTablets(zooScanner, tservers)) > 0) + return offline; + + if (RootTable.NAME.equals(tableName)) + return 0; + + System.out.println("Scanning " + RootTable.NAME); + Iterator rootScanner = new MetaDataTableScanner(instance, creds, MetadataSchema.TabletsSection.getRange(), RootTable.NAME); + if ((offline = checkTablets(rootScanner, tservers)) > 0) + return offline; + + if (MetadataTable.NAME.equals(tableName)) + return 0; + + System.out.println("Scanning " + MetadataTable.NAME); + + Range range = MetadataSchema.TabletsSection.getRange(); + if (tableName != null) { + String tableId = Tables.getTableId(instance, tableName); + range = new KeyExtent(new Text(tableId), null, null).toMetadataRange(); + } + + Iterator metaScanner = new MetaDataTableScanner(instance, creds, range, MetadataTable.NAME); + return checkTablets(metaScanner, tservers); + } + + private static int checkTablets(Iterator scanner, LiveTServerSet tservers) { + int offline = 0; + + while (scanner.hasNext() && !System.out.checkError()) { + TabletLocationState locationState = scanner.next(); + TabletState state = locationState.getState(tservers.getCurrentServers()); + if (state != null && state != TabletState.HOSTED + && TableManager.getInstance().getTableState(locationState.extent.getTableId().toString()) != TableState.OFFLINE) { + System.out.println(locationState + " is " + state + " #walogs:" + locationState.walogs.size()); + offline++; + } + } + + return offline; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/Halt.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Halt.java b/server/base/src/main/java/org/apache/accumulo/server/util/Halt.java new file mode 100644 index 0000000..6e3027f --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Halt.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.log4j.Logger; + +public class Halt { + static private Logger log = Logger.getLogger(Halt.class); + + public static void halt(final String msg) { + halt(0, new Runnable() { + public void run() { + log.fatal(msg); + } + }); + } + + public static void halt(final String msg, int status) { + halt(status, new Runnable() { + public void run() { + log.fatal(msg); + } + }); + } + + public static void halt(final int status, Runnable runnable) { + try { + // give ourselves a little time to try and do something + new Daemon() { + public void run() { + UtilWaitThread.sleep(100); + Runtime.getRuntime().halt(status); + } + }.start(); + + if (runnable != null) + runnable.run(); + Runtime.getRuntime().halt(status); + } finally { + // In case something else decides to throw a Runtime exception + Runtime.getRuntime().halt(-1); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/Info.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Info.java b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java new file mode 100644 index 0000000..29fa135 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.util.MonitorUtil; +import org.apache.accumulo.server.client.HdfsZooInstance; + +public class Info { + public static void main(String[] args) throws Exception { + Instance instance = HdfsZooInstance.getInstance(); + System.out.println("monitor: " + MonitorUtil.getLocation(instance)); + System.out.println("masters: " + instance.getMasterLocations()); + System.out.println("zookeepers: " + instance.getZooKeepers()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java new file mode 100644 index 0000000..b982829 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.Formattable; +import java.util.Formatter; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooReader; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.log4j.Logger; + +import com.beust.jcommander.Parameter; + +public class ListInstances { + + private static final Logger log = Logger.getLogger(ListInstances.class); + + private static final int NAME_WIDTH = 20; + private static final int UUID_WIDTH = 37; + private static final int MASTER_WIDTH = 30; + + private static final int ZOOKEEPER_TIMER_MILLIS = 30 * 1000; + + static class Opts extends Help { + @Parameter(names="--print-errors", description="display errors while listing instances") + boolean printErrors = false; + @Parameter(names="--print-all", description="print information for all instances, not just those with names") + boolean printAll = false; + @Parameter(names={"-z", "--zookeepers"}, description="the zookeepers to contact") + String keepers = null; + } + static Opts opts = new Opts(); + static int errors = 0; + + public static void main(String[] args) { + opts.parseArgs(ListInstances.class.getName(), args); + + if (opts.keepers == null) { + opts.keepers = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST); + } + + String keepers = opts.keepers; + boolean printAll = opts.printAll; + boolean printErrors = opts.printErrors; + + listInstances(keepers, printAll, printErrors); + + } + + static synchronized void listInstances(String keepers, boolean printAll, boolean printErrors) { + errors = 0; + + System.out.println("INFO : Using ZooKeepers " + keepers); + ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS); + ZooCache cache = new ZooCache(keepers, ZOOKEEPER_TIMER_MILLIS); + + TreeMap instanceNames = getInstanceNames(rdr, printErrors); + + System.out.println(); + printHeader(); + + for (Entry entry : instanceNames.entrySet()) { + printInstanceInfo(cache, entry.getKey(), entry.getValue(), printErrors); + } + + TreeSet instancedIds = getInstanceIDs(rdr, printErrors); + instancedIds.removeAll(instanceNames.values()); + + if (printAll) { + for (UUID uuid : instancedIds) { + printInstanceInfo(cache, null, uuid, printErrors); + } + } else if (instancedIds.size() > 0) { + System.out.println(); + System.out.println("INFO : " + instancedIds.size() + " unamed instances were not printed, run with --print-all to see all instances"); + } else { + System.out.println(); + } + + if (!printErrors && errors > 0) { + System.err.println("WARN : There were " + errors + " errors, run with --print-errors to see more info"); + } + } + + private static class CharFiller implements Formattable { + + char c; + + CharFiller(char c) { + this.c = c; + } + + @Override + public void formatTo(Formatter formatter, int flags, int width, int precision) { + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < width; i++) + sb.append(c); + formatter.format(sb.toString()); + } + + } + + private static void printHeader() { + System.out.printf(" %-" + NAME_WIDTH + "s| %-" + UUID_WIDTH + "s| %-" + MASTER_WIDTH + "s%n", "Instance Name", "Instance ID", "Master"); + System.out.printf("%" + (NAME_WIDTH + 1) + "s+%" + (UUID_WIDTH + 1) + "s+%" + (MASTER_WIDTH + 1) + "s%n", new CharFiller('-'), new CharFiller('-'), + new CharFiller('-')); + + } + + private static void printInstanceInfo(ZooCache cache, String instanceName, UUID iid, boolean printErrors) { + String master = getMaster(cache, iid, printErrors); + if (instanceName == null) { + instanceName = ""; + } + + if (master == null) { + master = ""; + } + + System.out.printf("%" + NAME_WIDTH + "s |%" + UUID_WIDTH + "s |%" + MASTER_WIDTH + "s%n", "\"" + instanceName + "\"", iid, master); + } + + private static String getMaster(ZooCache cache, UUID iid, boolean printErrors) { + + if (iid == null) { + return null; + } + + try { + String masterLocPath = Constants.ZROOT + "/" + iid + Constants.ZMASTER_LOCK; + byte[] master = ZooLock.getLockData(cache, masterLocPath, null); + if (master == null) { + return null; + } + return new String(master); + } catch (Exception e) { + handleException(e, printErrors); + return null; + } + } + + private static TreeMap getInstanceNames(ZooReader zk, boolean printErrors) { + + String instancesPath = Constants.ZROOT + Constants.ZINSTANCES; + + TreeMap tm = new TreeMap(); + + List names; + + try { + names = zk.getChildren(instancesPath); + } catch (Exception e) { + handleException(e, printErrors); + return tm; + } + + for (String name : names) { + String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + name; + try { + UUID iid = UUID.fromString(new String(zk.getData(instanceNamePath, null))); + tm.put(name, iid); + } catch (Exception e) { + handleException(e, printErrors); + tm.put(name, null); + } + } + + return tm; + } + + private static TreeSet getInstanceIDs(ZooReader zk, boolean printErrors) { + TreeSet ts = new TreeSet(); + + try { + List children = zk.getChildren(Constants.ZROOT); + + for (String iid : children) { + if (iid.equals("instances")) + continue; + try { + ts.add(UUID.fromString(iid)); + } catch (Exception e) { + log.error("Exception: " + e); + } + } + } catch (Exception e) { + handleException(e, printErrors); + } + + return ts; + } + + private static void handleException(Exception e, boolean printErrors) { + if (printErrors) { + e.printStackTrace(); + } + + errors++; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java new file mode 100644 index 0000000..112a619 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.server.cli.ClientOpts; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.net.HostAndPort; + +public class LocalityCheck { + + public int run(String[] args) throws Exception { + ClientOpts opts = new ClientOpts(); + opts.parseArgs(LocalityCheck.class.getName(), args); + + VolumeManager fs = VolumeManagerImpl.get(); + Connector connector = opts.getConnector(); + Scanner scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + + Map totalBlocks = new HashMap(); + Map localBlocks = new HashMap(); + ArrayList files = new ArrayList(); + + for (Entry entry : scanner) { + Key key = entry.getKey(); + if (key.compareColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) { + String location = entry.getValue().toString(); + String[] parts = location.split(":"); + String host = parts[0]; + addBlocks(fs, host, files, totalBlocks, localBlocks); + files.clear(); + } else if (key.compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + + files.add(fs.getFullPath(key).toString()); + } + } + System.out.println(" Server %local total blocks"); + for (String host : totalBlocks.keySet()) { + System.out.println(String.format("%15s %5.1f %8d", host, (localBlocks.get(host) * 100.) / totalBlocks.get(host), totalBlocks.get(host))); + } + return 0; + } + + private void addBlocks(VolumeManager fs, String host, ArrayList files, Map totalBlocks, Map localBlocks) throws Exception { + long allBlocks = 0; + long matchingBlocks = 0; + if (!totalBlocks.containsKey(host)) { + totalBlocks.put(host, 0L); + localBlocks.put(host, 0L); + } + for (String file : files) { + Path filePath = new Path(file); + FileSystem ns = fs.getFileSystemByPath(filePath); + FileStatus fileStatus = ns.getFileStatus(filePath); + BlockLocation[] fileBlockLocations = ns.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + for (BlockLocation blockLocation : fileBlockLocations) { + allBlocks++; + for (String location : blockLocation.getHosts()) { + HostAndPort hap = HostAndPort.fromParts(location, 0); + if (hap.getHostText().equals(host)) { + matchingBlocks++; + break; + } + } + } + } + totalBlocks.put(host, allBlocks + totalBlocks.get(host)); + localBlocks.put(host, matchingBlocks + localBlocks.get(host)); + } + + public static void main(String[] args) throws Exception { + LocalityCheck check = new LocalityCheck(); + System.exit(check.run(args)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/LoginProperties.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/LoginProperties.java b/server/base/src/main/java/org/apache/accumulo/server/util/LoginProperties.java new file mode 100644 index 0000000..e16bd06 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/LoginProperties.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.TokenProperty; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.security.handler.Authenticator; +import org.apache.accumulo.start.classloader.AccumuloClassLoader; + +/** + * + */ +public class LoginProperties { + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + AccumuloConfiguration config = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance()); + Authenticator authenticator = AccumuloClassLoader.getClassLoader().loadClass(config.get(Property.INSTANCE_SECURITY_AUTHENTICATOR)) + .asSubclass(Authenticator.class).newInstance(); + + List> tokenProps = new ArrayList>(); + + for (Class tokenType : authenticator.getSupportedTokenTypes()) { + tokenProps.add(tokenType.newInstance().getProperties()); + } + + System.out.println("Supported token types for " + authenticator.getClass().getName() + " are : "); + for (Class tokenType : authenticator.getSupportedTokenTypes()) { + System.out.println("\t" + tokenType.getName() + ", which accepts the following properties : "); + + for (TokenProperty tokenProperty : tokenType.newInstance().getProperties()) { + System.out.println("\t\t" + tokenProperty); + } + + System.out.println(); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java new file mode 100644 index 0000000..987eba9 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.ScannerImpl; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +/** + * + */ +public class MasterMetadataUtil { + + private static final Logger log = Logger.getLogger(MasterMetadataUtil.class); + + public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map datafileSizes, + Map bulkLoadedFiles, Credentials credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) { + Mutation m = extent.getPrevRowUpdateMutation(); + + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes())); + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes())); + if (lastFlushID > 0) + TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value(("" + lastFlushID).getBytes())); + if (lastCompactID > 0) + TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m, new Value(("" + lastCompactID).getBytes())); + + if (location != null) { + m.put(TabletsSection.CurrentLocationColumnFamily.NAME, location.asColumnQualifier(), location.asMutationValue()); + m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, location.asColumnQualifier()); + } + + for (Entry entry : datafileSizes.entrySet()) { + m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode())); + } + + for (Entry entry : bulkLoadedFiles.entrySet()) { + byte[] tidBytes = Long.toString(entry.getValue()).getBytes(); + m.put(TabletsSection.BulkFileColumnFamily.NAME, entry.getKey().meta(), new Value(tidBytes)); + } + + MetadataTableUtil.update(credentials, zooLock, m, extent); + } + + public static KeyExtent fixSplit(Text metadataEntry, SortedMap columns, TServerInstance tserver, Credentials credentials, ZooLock lock) + throws AccumuloException, IOException { + log.info("Incomplete split " + metadataEntry + " attempting to fix"); + + Value oper = columns.get(TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN); + + if (columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN) == null) { + throw new IllegalArgumentException("Metadata entry does not have split ratio (" + metadataEntry + ")"); + } + + double splitRatio = Double.parseDouble(new String(columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN).get())); + + Value prevEndRowIBW = columns.get(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN); + + if (prevEndRowIBW == null) { + throw new IllegalArgumentException("Metadata entry does not have prev row (" + metadataEntry + ")"); + } + + Value time = columns.get(TabletsSection.ServerColumnFamily.TIME_COLUMN); + + if (time == null) { + throw new IllegalArgumentException("Metadata entry does not have time (" + metadataEntry + ")"); + } + + Value flushID = columns.get(TabletsSection.ServerColumnFamily.FLUSH_COLUMN); + long initFlushID = -1; + if (flushID != null) + initFlushID = Long.parseLong(flushID.toString()); + + Value compactID = columns.get(TabletsSection.ServerColumnFamily.COMPACT_COLUMN); + long initCompactID = -1; + if (compactID != null) + initCompactID = Long.parseLong(compactID.toString()); + + Text metadataPrevEndRow = KeyExtent.decodePrevEndRow(prevEndRowIBW); + + Text table = (new KeyExtent(metadataEntry, (Text) null)).getTableId(); + + return fixSplit(table, metadataEntry, metadataPrevEndRow, oper, splitRatio, tserver, credentials, time.toString(), initFlushID, initCompactID, lock); + } + + private static KeyExtent fixSplit(Text table, Text metadataEntry, Text metadataPrevEndRow, Value oper, double splitRatio, TServerInstance tserver, + Credentials credentials, String time, long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException, IOException { + if (metadataPrevEndRow == null) + // something is wrong, this should not happen... if a tablet is split, it will always have a + // prev end row.... + throw new AccumuloException("Split tablet does not have prev end row, something is amiss, extent = " + metadataEntry); + + // check to see if prev tablet exist in metadata tablet + Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow))); + + ScannerImpl scanner2 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID, Authorizations.EMPTY); + scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW))); + + VolumeManager fs = VolumeManagerImpl.get(); + if (!scanner2.iterator().hasNext()) { + log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow); + MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), credentials, lock); + return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); + } else { + log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow); + + List highDatafilesToRemove = new ArrayList(); + + Scanner scanner3 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, MetadataTable.ID, Authorizations.EMPTY); + Key rowKey = new Key(metadataEntry); + + SortedMap origDatafileSizes = new TreeMap(); + SortedMap highDatafileSizes = new TreeMap(); + SortedMap lowDatafileSizes = new TreeMap(); + scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); + + for (Entry entry : scanner3) { + if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + } + } + + MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap(), origDatafileSizes, lowDatafileSizes, + highDatafileSizes, highDatafilesToRemove); + + MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, credentials, lock); + + return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow)); + } + + } + + private static TServerInstance getTServerInstance(String address, ZooLock zooLock) { + while (true) { + try { + return new TServerInstance(address, zooLock.getSessionId()); + } catch (KeeperException e) { + log.error(e, e); + } catch (InterruptedException e) { + log.error(e, e); + } + UtilWaitThread.sleep(1000); + } + } + + public static void replaceDatafiles(KeyExtent extent, Set datafilesToDelete, Set scanFiles, FileRef path, Long compactionId, + DataFileValue size, Credentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) throws IOException { + replaceDatafiles(extent, datafilesToDelete, scanFiles, path, compactionId, size, credentials, address, lastLocation, zooLock, true); + } + + public static void replaceDatafiles(KeyExtent extent, Set datafilesToDelete, Set scanFiles, FileRef path, Long compactionId, + DataFileValue size, Credentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) throws IOException { + + if (insertDeleteFlags) { + // add delete flags for those paths before the data file reference is removed + MetadataTableUtil.addDeleteEntries(extent, datafilesToDelete, credentials); + } + + // replace data file references to old mapfiles with the new mapfiles + Mutation m = new Mutation(extent.getMetadataEntry()); + + for (FileRef pathToRemove : datafilesToDelete) + m.putDelete(DataFileColumnFamily.NAME, pathToRemove.meta()); + + for (FileRef scanFile : scanFiles) + m.put(ScanFileColumnFamily.NAME, scanFile.meta(), new Value("".getBytes())); + + if (size.getNumEntries() > 0) + m.put(DataFileColumnFamily.NAME, path.meta(), new Value(size.encode())); + + if (compactionId != null) + TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m, new Value(("" + compactionId).getBytes())); + + TServerInstance self = getTServerInstance(address, zooLock); + self.putLastLocation(m); + + // remove the old location + if (lastLocation != null && !lastLocation.equals(self)) + lastLocation.clearLastLocation(m); + + MetadataTableUtil.update(credentials, zooLock, m, extent); + } + + /** + * new data file update function adds one data file to a tablet's list + * + * @param path + * should be relative to the table directory + * + */ + public static void updateTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, Credentials credentials, + Set filesInUseByScans, String address, ZooLock zooLock, Set unusedWalLogs, TServerInstance lastLocation, long flushId) { + if (extent.equals(RootTable.EXTENT)) { + if (unusedWalLogs != null) { + IZooReaderWriter zk = ZooReaderWriter.getInstance(); + // unusedWalLogs will contain the location/name of each log in a log set + // the log set is stored under one of the log names, but not both + // find the entry under one of the names and delete it. + String root = MetadataTableUtil.getZookeeperLogLocation(); + boolean foundEntry = false; + for (String entry : unusedWalLogs) { + String[] parts = entry.split("/"); + String zpath = root + "/" + parts[parts.length - 1]; + while (true) { + try { + if (zk.exists(zpath)) { + zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP); + foundEntry = true; + } + break; + } catch (KeeperException e) { + log.error(e, e); + } catch (InterruptedException e) { + log.error(e, e); + } + UtilWaitThread.sleep(1000); + } + } + if (unusedWalLogs.size() > 0 && !foundEntry) + log.warn("WALog entry for root tablet did not exist " + unusedWalLogs); + } + return; + } + + Mutation m = new Mutation(extent.getMetadataEntry()); + + if (dfv.getNumEntries() > 0) { + m.put(DataFileColumnFamily.NAME, path.meta(), new Value(dfv.encode())); + TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes())); + // stuff in this location + TServerInstance self = getTServerInstance(address, zooLock); + self.putLastLocation(m); + // erase the old location + if (lastLocation != null && !lastLocation.equals(self)) + lastLocation.clearLastLocation(m); + } + if (unusedWalLogs != null) { + for (String entry : unusedWalLogs) { + m.putDelete(LogColumnFamily.NAME, new Text(entry)); + } + } + + for (FileRef scanFile : filesInUseByScans) + m.put(ScanFileColumnFamily.NAME, scanFile.meta(), new Value("".getBytes())); + + if (mergeFile != null) + m.putDelete(DataFileColumnFamily.NAME, mergeFile.meta()); + + TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value((flushId + "").getBytes())); + + MetadataTableUtil.update(credentials, zooLock, m, extent); + + } + +}