Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6FBE4106B9 for ; Fri, 1 Nov 2013 00:55:52 +0000 (UTC) Received: (qmail 67910 invoked by uid 500); 1 Nov 2013 00:55:47 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 67575 invoked by uid 500); 1 Nov 2013 00:55:44 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 66555 invoked by uid 99); 1 Nov 2013 00:55:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Nov 2013 00:55:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 10F946F20; Fri, 1 Nov 2013 00:55:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Fri, 01 Nov 2013 00:56:13 -0000 Message-Id: <02441bb3264e4c93aa85d007bf4673fc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [34/54] [partial] ACCUMULO-658, ACCUMULO-656 Split server into separate modules http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java new file mode 100644 index 0000000..0ad2196 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MergeInfo.Operation; +import org.apache.hadoop.io.Text; + +/** + * Merge makes things hard. + * + * Typically, a client will read the list of tablets, and begin an operation on that tablet at the location listed in the metadata table. When a tablet splits, + * the information read from the metadata table doesn't match reality, so the operation fails, and must be retried. But the operation will take place either on + * the parent, or at a later time on the children. It won't take place on just half of the tablet. + * + * However, when a merge occurs, the operation may have succeeded on one section of the merged area, and not on the others, when the merge occurs. There is no + * way to retry the request at a later time on an unmodified tablet. + * + * The code below uses read-write lock to prevent some operations while a merge is taking place. Normal operations, like bulk imports, will grab the read lock + * and prevent merges (writes) while they run. Merge operations will lock out some operations while they run. + */ +class TableRangeOpWait extends MasterRepo { + + private static final long serialVersionUID = 1L; + private String tableId; + + public TableRangeOpWait(String tableId) { + this.tableId = tableId; + } + + @Override + public long isReady(long tid, Master env) throws Exception { + Text tableIdText = new Text(tableId); + if (!env.getMergeInfo(tableIdText).getState().equals(MergeState.NONE)) { + return 50; + } + return 0; + } + + @Override + public Repo call(long tid, Master master) throws Exception { + Text tableIdText = new Text(tableId); + MergeInfo mergeInfo = master.getMergeInfo(tableIdText); + log.info("removing merge information " + mergeInfo); + master.clearMergeState(tableIdText); + Utils.unreserveTable(tableId, tid, true); + return null; + } + +} + +public class TableRangeOp extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private String tableId; + private byte[] startRow; + private byte[] endRow; + private Operation op; + + @Override + public long isReady(long tid, Master environment) throws Exception { + return Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE); + } + + public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, Text endRow) throws ThriftTableOperationException { + + this.tableId = tableId; + this.startRow = TextUtil.getBytes(startRow); + this.endRow = TextUtil.getBytes(endRow); + this.op = op; + } + + @Override + public Repo call(long tid, Master env) throws Exception { + + if (RootTable.ID.equals(tableId) && TableOperation.MERGE.equals(op)) { + log.warn("Attempt to merge tablets for " + RootTable.NAME + " does nothing. It is not splittable."); + } + + Text start = startRow.length == 0 ? null : new Text(startRow); + Text end = endRow.length == 0 ? null : new Text(endRow); + Text tableIdText = new Text(tableId); + + if (start != null && end != null) + if (start.compareTo(end) >= 0) + throw new ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.BAD_RANGE, + "start row must be less than end row"); + + env.mustBeOnline(tableId); + + MergeInfo info = env.getMergeInfo(tableIdText); + + if (info.getState() == MergeState.NONE) { + KeyExtent range = new KeyExtent(tableIdText, end, start); + env.setMergeState(new MergeInfo(range, op), MergeState.STARTED); + } + + return new TableRangeOpWait(tableId); + } + + @Override + public void undo(long tid, Master env) throws Exception { + // Not sure this is a good thing to do. The Master state engine should be the one to remove it. + Text tableIdText = new Text(tableId); + MergeInfo mergeInfo = env.getMergeInfo(tableIdText); + if (mergeInfo.getState() != MergeState.NONE) + log.info("removing merge information " + mergeInfo); + env.clearMergeState(tableIdText); + Utils.unreserveTable(tableId, tid, true); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java new file mode 100644 index 0000000..dd691b4 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TraceRepo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.trace.instrument.Span; +import org.apache.accumulo.trace.instrument.Trace; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.accumulo.trace.thrift.TInfo; + +/** + * + */ +public class TraceRepo implements Repo { + + private static final long serialVersionUID = 1L; + + TInfo tinfo; + Repo repo; + + public TraceRepo(Repo repo) { + this.repo = repo; + tinfo = Tracer.traceInfo(); + } + + @Override + public long isReady(long tid, T environment) throws Exception { + Span span = Trace.trace(tinfo, repo.getDescription()); + try { + return repo.isReady(tid, environment); + } finally { + span.stop(); + } + } + + @Override + public Repo call(long tid, T environment) throws Exception { + Span span = Trace.trace(tinfo, repo.getDescription()); + try { + Repo result = repo.call(tid, environment); + if (result == null) + return result; + return new TraceRepo(result); + } finally { + span.stop(); + } + } + + @Override + public void undo(long tid, T environment) throws Exception { + Span span = Trace.trace(tinfo, repo.getDescription()); + try { + repo.undo(tid, environment); + } finally { + span.stop(); + } + } + + @Override + public String getDescription() { + return repo.getDescription(); + } + + @Override + public String getReturn() { + return repo.getReturn(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java new file mode 100644 index 0000000..fa14f43 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import java.math.BigInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator; +import org.apache.accumulo.fate.zookeeper.ZooReservation; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooQueueLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.commons.codec.binary.Base64; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; + +public class Utils { + + static void checkTableDoesNotExist(Instance instance, String tableName, String tableId, TableOperation operation) throws ThriftTableOperationException { + + String id = Tables.getNameToIdMap(instance).get(tableName); + + if (id != null && !id.equals(tableId)) + throw new ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, null); + } + + static String getNextTableId(String tableName, Instance instance) throws ThriftTableOperationException { + + String tableId = null; + try { + IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance(); + final String ntp = ZooUtil.getRoot(instance) + Constants.ZTABLES; + byte[] nid = zoo.mutate(ntp, "0".getBytes(), ZooUtil.PUBLIC, new Mutator() { + @Override + public byte[] mutate(byte[] currentValue) throws Exception { + BigInteger nextId = new BigInteger(new String(currentValue), Character.MAX_RADIX); + nextId = nextId.add(BigInteger.ONE); + return nextId.toString(Character.MAX_RADIX).getBytes(); + } + }); + return new String(nid); + } catch (Exception e1) { + Logger.getLogger(CreateTable.class).error("Failed to assign tableId to " + tableName, e1); + throw new ThriftTableOperationException(tableId, tableName, TableOperation.CREATE, TableOperationExceptionType.OTHER, e1.getMessage()); + } + } + + static final Lock tableNameLock = new ReentrantLock(); + static final Lock idLock = new ReentrantLock(); + private static final Logger log = Logger.getLogger(Utils.class); + + public static long reserveTable(String tableId, long tid, boolean writeLock, boolean tableMustExist, TableOperation op) throws Exception { + if (getLock(tableId, tid, writeLock).tryLock()) { + if (tableMustExist) { + Instance instance = HdfsZooInstance.getInstance(); + IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance(); + if (!zk.exists(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId)) + throw new ThriftTableOperationException(tableId, "", op, TableOperationExceptionType.NOTFOUND, "Table does not exists"); + } + log.info("table " + tableId + " (" + Long.toHexString(tid) + ") locked for " + (writeLock ? "write" : "read") + " operation: " + op); + return 0; + } else + return 100; + } + + public static void unreserveTable(String tableId, long tid, boolean writeLock) throws Exception { + getLock(tableId, tid, writeLock).unlock(); + log.info("table " + tableId + " (" + Long.toHexString(tid) + ") unlocked for " + (writeLock ? "write" : "read")); + } + + public static long reserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException { + Instance instance = HdfsZooInstance.getInstance(); + + String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/" + new String(Base64.encodeBase64(directory.getBytes())); + + IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance(); + + if (ZooReservation.attempt(zk, resvPath, String.format("%016x", tid), "")) { + return 0; + } else + return 50; + } + + public static void unreserveHdfsDirectory(String directory, long tid) throws KeeperException, InterruptedException { + Instance instance = HdfsZooInstance.getInstance(); + String resvPath = ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS + "/" + new String(Base64.encodeBase64(directory.getBytes())); + ZooReservation.release(ZooReaderWriter.getRetryingInstance(), resvPath, String.format("%016x", tid)); + } + + private static Lock getLock(String tableId, long tid, boolean writeLock) throws Exception { + byte[] lockData = String.format("%016x", tid).getBytes(); + ZooQueueLock qlock = new ZooQueueLock(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZTABLE_LOCKS + "/" + tableId, false); + Lock lock = DistributedReadWriteLock.recoverLock(qlock, lockData); + if (lock == null) { + DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData); + if (writeLock) + lock = locker.writeLock(); + else + lock = locker.readLock(); + } + return lock; + } + + public static Lock getReadLock(String tableId, long tid) throws Exception { + return Utils.getLock(tableId, tid, false); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java new file mode 100644 index 0000000..7189637 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tserverOps; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.master.EventCoordinator.Listener; +import org.apache.accumulo.master.tableOps.MasterRepo; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.zookeeper.ZooLock; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TTransportException; + +public class ShutdownTServer extends MasterRepo { + + private static final long serialVersionUID = 1L; + private static final Logger log = Logger.getLogger(ShutdownTServer.class); + private TServerInstance server; + private boolean force; + + public ShutdownTServer(TServerInstance server, boolean force) { + this.server = server; + this.force = force; + } + + @Override + public long isReady(long tid, Master environment) throws Exception { + return 0; + } + + @Override + public Repo call(long tid, Master master) throws Exception { + // suppress assignment of tablets to the server + if (force) { + String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZTSERVERS + "/" + server.getLocation(); + ZooLock.deleteLock(path); + path = ZooUtil.getRoot(master.getInstance()) + Constants.ZDEADTSERVERS + "/" + server.getLocation(); + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + zoo.putPersistentData(path, "forced down".getBytes(), NodeExistsPolicy.OVERWRITE); + return null; + } + + // TODO move this to isReady() and drop while loop? - ACCUMULO-1259 + Listener listener = master.getEventCoordinator().getListener(); + master.shutdownTServer(server); + while (master.onlineTabletServers().contains(server)) { + TServerConnection connection = master.getConnection(server); + if (connection != null) { + try { + TabletServerStatus status = connection.getTableMap(false); + if (status.tableMap != null && status.tableMap.isEmpty()) { + log.info("tablet server hosts no tablets " + server); + connection.halt(master.getMasterLock()); + log.info("tablet server asked to halt " + server); + break; + } + } catch (TTransportException ex) { + // expected + } catch (Exception ex) { + log.error("Error talking to tablet server " + server + ": " + ex); + } + } + listener.waitForEvents(1000); + } + + return null; + } + + @Override + public void undo(long tid, Master m) throws Exception {} +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java b/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java new file mode 100644 index 0000000..f794112 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/util/FateAdmin.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.Help; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.AdminUtil; +import org.apache.accumulo.fate.ZooStore; +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; + +/** + * A utility to administer FATE operations + */ +public class FateAdmin { + + static class TxOpts { + @Parameter(description = "", required = true) + List args = new ArrayList(); + } + + @Parameters(commandDescription = "Stop an existing FATE by transaction id") + static class FailOpts extends TxOpts {} + + @Parameters(commandDescription = "Delete an existing FATE by transaction id") + static class DeleteOpts extends TxOpts {} + + @Parameters(commandDescription = "List the existing FATE transactions") + static class PrintOpts {} + + public static void main(String[] args) throws Exception { + Help opts = new Help(); + JCommander jc = new JCommander(opts); + jc.setProgramName(FateAdmin.class.getName()); + jc.addCommand("fail", new FailOpts()); + jc.addCommand("delete", new DeleteOpts()); + jc.addCommand("print", new PrintOpts()); + jc.parse(args); + if (opts.help || jc.getParsedCommand() == null) { + jc.usage(); + System.exit(1); + } + + System.err.printf("This tool has been deprecated%nFATE administration now available within 'accumulo shell'%n$ fate fail ... | delete ... | print [...]%n%n"); + + AdminUtil admin = new AdminUtil(); + + Instance instance = HdfsZooInstance.getInstance(); + String path = ZooUtil.getRoot(instance) + Constants.ZFATE; + String masterPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK; + IZooReaderWriter zk = ZooReaderWriter.getRetryingInstance(); + ZooStore zs = new ZooStore(path, zk); + + if (jc.getParsedCommand().equals("fail")) { + admin.prepFail(zs, zk, masterPath, args[1]); + } else if (jc.getParsedCommand().equals("delete")) { + admin.prepDelete(zs, zk, masterPath, args[1]); + admin.deleteLocks(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, args[1]); + } else if (jc.getParsedCommand().equals("print")) { + admin.print(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java b/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java new file mode 100644 index 0000000..3389aa3 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/DefaultMapTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import org.apache.accumulo.server.util.DefaultMap; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class DefaultMapTest { + + @Test + public void testDefaultMap() { + DefaultMap map = new DefaultMap(""); + map.put("key", "value"); + String empty = map.get("otherKey"); + assertEquals(map.get("key"), "value"); + assertEquals(empty, ""); + assertTrue(empty == map.get("otherKey")); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java new file mode 100644 index 0000000..f435062 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.Credentials; +import org.apache.accumulo.master.state.MergeStats; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.CurrentState; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +/** + * + */ +public class TestMergeState { + + class MockCurrentState implements CurrentState { + + TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456); + MergeInfo mergeInfo; + + MockCurrentState(MergeInfo info) { + this.mergeInfo = info; + } + + @Override + public Set onlineTables() { + return Collections.singleton("t"); + } + + @Override + public Set onlineTabletServers() { + return Collections.singleton(someTServer); + } + + @Override + public Collection merges() { + return Collections.singleton(mergeInfo); + } + } + + private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException { + BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + bw.addMutation(m); + bw.close(); + } + + @Test + public void test() throws Exception { + Instance instance = new MockInstance(); + Connector connector = instance.getConnector("root", new PasswordToken("")); + BatchWriter bw = connector.createBatchWriter("!METADATA", new BatchWriterConfig()); + + // Create a fake METADATA table with these splits + String splits[] = {"a", "e", "j", "o", "t", "z"}; + // create metadata for a table "t" with the splits above + Text tableId = new Text("t"); + Text pr = null; + for (String s : splits) { + Text split = new Text(s); + Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr)); + prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes())); + ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes())); + bw.addMutation(prevRow); + pr = split; + } + // Add the default tablet + Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr)); + defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes())); + bw.addMutation(defaultTablet); + bw.close(); + + // Read out the TabletLocationStates + MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE)); + Credentials credentials = new Credentials("root", new PasswordToken(new byte[0])); + + // Verify the tablet state: hosted, and count + MetaDataStateStore metaDataStateStore = new MetaDataStateStore(instance, credentials, state); + int count = 0; + for (TabletLocationState tss : metaDataStateStore) { + Assert.assertEquals(TabletState.HOSTED, tss.getState(state.onlineTabletServers())); + count++; + } + Assert.assertEquals(splits.length + 1, count); + + // Create the hole + // Split the tablet at one end of the range + Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation(); + TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes())); + TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o"))); + update(connector, m); + + // do the state check + MergeStats stats = scan(state, metaDataStateStore); + MergeState newState = stats.nextMergeState(connector, state); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); + + // unassign the tablets + BatchDeleter deleter = connector.createBatchDeleter("!METADATA", Authorizations.EMPTY, 1000, new BatchWriterConfig()); + deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME); + deleter.setRanges(Collections.singletonList(new Range())); + deleter.delete(); + + // now we should be ready to merge but, we have an inconsistent !METADATA table + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); + + // finish the split + KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o")); + m = tablet.getPrevRowUpdateMutation(); + TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes())); + update(connector, m); + metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer))); + + // onos... there's a new tablet online + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state)); + + // chop it + m = tablet.getPrevRowUpdateMutation(); + ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes())); + update(connector, m); + + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); + + // take it offline + m = tablet.getPrevRowUpdateMutation(); + Collection> walogs = Collections.emptyList(); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false))); + + // now we can split + stats = scan(state, metaDataStateStore); + Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state)); + + } + + /** + * @param state + * @param metaDataStateStore + * @param locations + * @return + */ + private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) { + MergeStats stats = new MergeStats(state.mergeInfo); + stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE); + for (TabletLocationState tss : metaDataStateStore) { + stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false); + } + return stats; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java new file mode 100644 index 0000000..d7fc619 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/state/MergeInfoTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.state; + +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.server.master.state.MergeInfo; +import org.apache.accumulo.server.master.state.MergeState; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +public class MergeInfoTest { + + MergeInfo readWrite(MergeInfo info) throws Exception { + DataOutputBuffer buffer = new DataOutputBuffer(); + info.write(buffer); + DataInputBuffer in = new DataInputBuffer(); + in.reset(buffer.getData(), 0, buffer.getLength()); + MergeInfo info2 = new MergeInfo(); + info2.readFields(in); + Assert.assertEquals(info.getExtent(), info2.getExtent()); + Assert.assertEquals(info.getState(), info2.getState()); + Assert.assertEquals(info.getOperation(), info2.getOperation()); + return info2; + } + + KeyExtent ke(String tableId, String endRow, String prevEndRow) { + return new KeyExtent(new Text(tableId), endRow == null ? null : new Text(endRow), prevEndRow == null ? null : new Text(prevEndRow)); + } + + @Test + public void testWritable() throws Exception { + MergeInfo info; + info = readWrite(new MergeInfo(ke("a", null, "b"), MergeInfo.Operation.MERGE)); + info = readWrite(new MergeInfo(ke("a", "b", null), MergeInfo.Operation.MERGE)); + info = readWrite(new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.MERGE)); + info = readWrite(new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.DELETE)); + Assert.assertTrue(info.isDelete()); + info.setState(MergeState.COMPLETE); + } + + @Test + public void testNeedsToBeChopped() throws Exception { + MergeInfo info = new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.DELETE); + Assert.assertTrue(info.needsToBeChopped(ke("x", "c", "b"))); + Assert.assertTrue(info.overlaps(ke("x", "c", "b"))); + Assert.assertFalse(info.needsToBeChopped(ke("y", "c", "b"))); + Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "bb"))); + Assert.assertFalse(info.needsToBeChopped(ke("x", "b", "a"))); + info = new MergeInfo(ke("x", "b", "a"), MergeInfo.Operation.MERGE); + Assert.assertTrue(info.needsToBeChopped(ke("x", "c", "a"))); + Assert.assertTrue(info.needsToBeChopped(ke("x", "aa", "a"))); + Assert.assertTrue(info.needsToBeChopped(ke("x", null, null))); + Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "b"))); + Assert.assertFalse(info.needsToBeChopped(ke("y", "c", "b"))); + Assert.assertFalse(info.needsToBeChopped(ke("x", "c", "bb"))); + Assert.assertTrue(info.needsToBeChopped(ke("x", "b", "a"))); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java new file mode 100644 index 0000000..3479d35 --- /dev/null +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.state; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.server.master.state.Assignment; +import org.apache.accumulo.server.master.state.DistributedStore; +import org.apache.accumulo.server.master.state.DistributedStoreException; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class RootTabletStateStoreTest { + + static class Node { + Node(String name) { + this.name = name; + } + + List children = new ArrayList(); + String name; + byte[] value = new byte[] {}; + + Node find(String name) { + for (Node node : children) + if (node.name.equals(name)) + return node; + return null; + } + }; + + static class FakeZooStore implements DistributedStore { + + Node root = new Node("/"); + + private Node recurse(Node root, String[] path, int depth) { + if (depth == path.length) + return root; + Node child = root.find(path[depth]); + if (child == null) + return null; + return recurse(child, path, depth + 1); + } + + private Node navigate(String path) { + path = path.replaceAll("/$", ""); + return recurse(root, path.split("/"), 1); + } + + @Override + public List getChildren(String path) throws DistributedStoreException { + Node node = navigate(path); + if (node == null) + return Collections.emptyList(); + List children = new ArrayList(node.children.size()); + for (Node child : node.children) + children.add(child.name); + return children; + } + + @Override + public void put(String path, byte[] bs) throws DistributedStoreException { + create(path).value = bs; + } + + private Node create(String path) { + String[] parts = path.split("/"); + return recurseCreate(root, parts, 1); + } + + private Node recurseCreate(Node root, String[] path, int index) { + if (path.length == index) + return root; + Node node = root.find(path[index]); + if (node == null) { + node = new Node(path[index]); + root.children.add(node); + } + return recurseCreate(node, path, index + 1); + } + + @Override + public void remove(String path) throws DistributedStoreException { + String[] parts = path.split("/"); + String[] parentPath = Arrays.copyOf(parts, parts.length - 1); + Node parent = recurse(root, parentPath, 1); + if (parent == null) + return; + Node child = parent.find(parts[parts.length - 1]); + if (child != null) + parent.children.remove(child); + } + + @Override + public byte[] get(String path) throws DistributedStoreException { + Node node = navigate(path); + if (node != null) + return node.value; + return null; + } + } + + @Test + public void testFakeZoo() throws DistributedStoreException { + DistributedStore store = new FakeZooStore(); + store.put("/a/b/c", "abc".getBytes()); + byte[] abc = store.get("/a/b/c"); + assertArrayEquals(abc, "abc".getBytes()); + byte[] empty = store.get("/a/b"); + assertArrayEquals(empty, "".getBytes()); + store.put("/a/b", "ab".getBytes()); + assertArrayEquals(store.get("/a/b"), "ab".getBytes()); + store.put("/a/b/b", "abb".getBytes()); + List children = store.getChildren("/a/b"); + assertEquals(new HashSet(children), new HashSet(Arrays.asList("b", "c"))); + store.remove("/a/b/c"); + children = store.getChildren("/a/b"); + assertEquals(new HashSet(children), new HashSet(Arrays.asList("b"))); + } + + @Test + public void testRootTabletStateStore() throws DistributedStoreException { + ZooTabletStateStore tstore = new ZooTabletStateStore(new FakeZooStore()); + KeyExtent root = RootTable.EXTENT; + String sessionId = "this is my unique session data"; + TServerInstance server = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), sessionId); + List assignments = Collections.singletonList(new Assignment(root, server)); + tstore.setFutureLocations(assignments); + int count = 0; + for (TabletLocationState location : tstore) { + assertEquals(location.extent, root); + assertEquals(location.future, server); + assertNull(location.current); + count++; + } + assertEquals(count, 1); + tstore.setLocations(assignments); + count = 0; + for (TabletLocationState location : tstore) { + assertEquals(location.extent, root); + assertNull(location.future); + assertEquals(location.current, server); + count++; + } + assertEquals(count, 1); + TabletLocationState assigned = null; + try { + assigned = new TabletLocationState(root, server, null, null, null, false); + } catch (BadLocationStateException e) { + fail("Unexpected error " + e); + } + tstore.unassign(Collections.singletonList(assigned)); + count = 0; + for (TabletLocationState location : tstore) { + assertEquals(location.extent, root); + assertNull(location.future); + assertNull(location.current); + count++; + } + assertEquals(count, 1); + + KeyExtent notRoot = new KeyExtent(new Text("0"), null, null); + try { + tstore.setLocations(Collections.singletonList(new Assignment(notRoot, server))); + Assert.fail("should not get here"); + } catch (IllegalArgumentException ex) {} + + try { + tstore.setFutureLocations(Collections.singletonList(new Assignment(notRoot, server))); + Assert.fail("should not get here"); + } catch (IllegalArgumentException ex) {} + + TabletLocationState broken = null; + try { + broken = new TabletLocationState(notRoot, server, null, null, null, false); + } catch (BadLocationStateException e) { + fail("Unexpected error " + e); + } + try { + tstore.unassign(Collections.singletonList(broken)); + Assert.fail("should not get here"); + } catch (IllegalArgumentException ex) {} + } + + // @Test + // public void testMetaDataStore() { } // see functional test +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/pom.xml ---------------------------------------------------------------------- diff --git a/server/monitor/pom.xml b/server/monitor/pom.xml new file mode 100644 index 0000000..6f6a147 --- /dev/null +++ b/server/monitor/pom.xml @@ -0,0 +1,146 @@ + + + + 4.0.0 + + org.apache.accumulo + accumulo-project + 1.6.0-SNAPSHOT + ../../pom.xml + + accumulo-monitor + Monitor Server + + + com.beust + jcommander + + + com.google.code.gson + gson + + + jline + jline + + + org.apache.accumulo + accumulo-core + + + org.apache.accumulo + accumulo-fate + + + org.apache.accumulo + accumulo-server-base + + + org.apache.accumulo + accumulo-start + + + org.apache.accumulo + accumulo-trace + + + org.apache.thrift + libthrift + + + commons-codec + commons-codec + provided + + + commons-collections + commons-collections + provided + + + commons-configuration + commons-configuration + provided + + + commons-io + commons-io + provided + + + commons-lang + commons-lang + provided + + + javax.servlet + servlet-api + provided + + + log4j + log4j + provided + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.zookeeper + zookeeper + provided + + + org.mortbay.jetty + jetty + provided + + + junit + junit + test + + + org.slf4j + slf4j-api + test + + + org.slf4j + slf4j-log4j12 + test + + + + + + + org.apache.rat + apache-rat-plugin + + + src/main/resources/web/flot/**/*.js + + + + + + + http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java new file mode 100644 index 0000000..e16b598 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor; + +import javax.servlet.http.HttpServlet; + +import org.apache.accumulo.core.conf.Property; +import org.mortbay.jetty.Server; +import org.mortbay.jetty.bio.SocketConnector; +import org.mortbay.jetty.handler.ContextHandlerCollection; +import org.mortbay.jetty.security.SslSocketConnector; +import org.mortbay.jetty.servlet.Context; +import org.mortbay.jetty.servlet.SessionHandler; + +public class EmbeddedWebServer { + + Server server = null; + SocketConnector sock; + ContextHandlerCollection handler; + Context root; + boolean usingSsl; + + public EmbeddedWebServer() { + this("0.0.0.0", 0); + } + + public EmbeddedWebServer(String host, int port) { + server = new Server(); + handler = new ContextHandlerCollection(); + root = new Context(handler, "/", new SessionHandler(), null, null, null); + + if (Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTORE) == "" + || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTOREPASS) == "" + || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTORE) == "" + || Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTOREPASS) == "") { + sock = new SocketConnector(); + usingSsl = false; + } else { + sock = new SslSocketConnector(); + ((SslSocketConnector) sock).setKeystore(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTORE)); + ((SslSocketConnector) sock).setKeyPassword(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_KEYSTOREPASS)); + ((SslSocketConnector) sock).setTruststore(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTORE)); + ((SslSocketConnector) sock).setTrustPassword(Monitor.getSystemConfiguration().get(Property.MONITOR_SSL_TRUSTSTOREPASS)); + usingSsl = true; + } + sock.setHost(host); + sock.setPort(port); + } + + public void addServlet(Class klass, String where) { + root.addServlet(klass, where); + } + + public int getPort() { + return sock.getLocalPort(); + } + + public void start() { + try { + server.addConnector(sock); + server.setHandler(handler); + server.start(); + } catch (Exception e) { + stop(); + throw new RuntimeException(e); + } + } + + public void stop() { + try { + server.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public boolean isUsingSsl() { + return usingSsl; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java new file mode 100644 index 0000000..aaec7e4 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.gc.thrift.GCMonitorService; +import org.apache.accumulo.core.gc.thrift.GCStatus; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; +import org.apache.accumulo.core.master.thrift.TableInfo; +import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.security.SecurityUtil; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.ThriftUtil; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.monitor.servlets.DefaultServlet; +import org.apache.accumulo.monitor.servlets.GcStatusServlet; +import org.apache.accumulo.monitor.servlets.JSONServlet; +import org.apache.accumulo.monitor.servlets.LogServlet; +import org.apache.accumulo.monitor.servlets.MasterServlet; +import org.apache.accumulo.monitor.servlets.OperationServlet; +import org.apache.accumulo.monitor.servlets.ProblemServlet; +import org.apache.accumulo.monitor.servlets.ShellServlet; +import org.apache.accumulo.monitor.servlets.TServersServlet; +import org.apache.accumulo.monitor.servlets.TablesServlet; +import org.apache.accumulo.monitor.servlets.VisServlet; +import org.apache.accumulo.monitor.servlets.XMLServlet; +import org.apache.accumulo.monitor.servlets.trace.ListType; +import org.apache.accumulo.monitor.servlets.trace.ShowTrace; +import org.apache.accumulo.monitor.servlets.trace.Summary; +import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.monitor.LogService; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.util.TableInfoUtil; +import org.apache.accumulo.trace.instrument.Tracer; +import org.apache.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +import com.google.common.net.HostAndPort; + +/** + * Serve master statistics with an embedded web server. + */ +public class Monitor { + private static final Logger log = Logger.getLogger(Monitor.class); + + private static final int REFRESH_TIME = 5; + private static long lastRecalc = 0L; + private static double totalIngestRate = 0.0; + private static double totalIngestByteRate = 0.0; + private static double totalQueryRate = 0.0; + private static double totalScanRate = 0.0; + private static double totalQueryByteRate = 0.0; + private static long totalEntries = 0L; + private static int totalTabletCount = 0; + private static int onlineTabletCount = 0; + private static long totalHoldTime = 0; + private static long totalLookups = 0; + private static int totalTables = 0; + + private static class MaxList extends LinkedList> { + private static final long serialVersionUID = 1L; + + private long maxDelta; + + public MaxList(long maxDelta) { + this.maxDelta = maxDelta; + } + + @Override + public boolean add(Pair obj) { + boolean result = super.add(obj); + + if (obj.getFirst() - get(0).getFirst() > maxDelta) + remove(0); + + return result; + } + + } + + private static final int MAX_TIME_PERIOD = 60 * 60 * 1000; + private static final List> loadOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> ingestRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> ingestByteRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> recoveriesOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> minorCompactionsOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> majorCompactionsOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> lookupsOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> queryRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> scanRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> queryByteRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> indexCacheHitRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static final List> dataCacheHitRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); + private static EventCounter lookupRateTracker = new EventCounter(); + private static EventCounter indexCacheHitTracker = new EventCounter(); + private static EventCounter indexCacheRequestTracker = new EventCounter(); + private static EventCounter dataCacheHitTracker = new EventCounter(); + private static EventCounter dataCacheRequestTracker = new EventCounter(); + + private static volatile boolean fetching = false; + private static MasterMonitorInfo mmi; + private static Map> problemSummary = Collections.emptyMap(); + private static Exception problemException; + private static GCStatus gcStatus; + + private static Instance instance; + + private static ServerConfiguration config; + + private static EmbeddedWebServer server; + + private static class EventCounter { + + Map> prevSamples = new HashMap>(); + Map> samples = new HashMap>(); + Set serversUpdated = new HashSet(); + + void startingUpdates() { + serversUpdated.clear(); + } + + void updateTabletServer(String name, long sampleTime, long numEvents) { + Pair newSample = new Pair(sampleTime, numEvents); + Pair lastSample = samples.get(name); + + if (lastSample == null || !lastSample.equals(newSample)) { + samples.put(name, newSample); + if (lastSample != null) { + prevSamples.put(name, lastSample); + } + } + serversUpdated.add(name); + } + + void finishedUpdating() { + // remove any tablet servers not updated + samples.keySet().retainAll(serversUpdated); + prevSamples.keySet().retainAll(serversUpdated); + } + + double calculateRate() { + double totalRate = 0; + + for (Entry> entry : prevSamples.entrySet()) { + Pair prevSample = entry.getValue(); + Pair sample = samples.get(entry.getKey()); + + totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst() - prevSample.getFirst()) / (double) 1000); + } + + return totalRate; + } + + long calculateCount() { + long count = 0; + + for (Entry> entry : prevSamples.entrySet()) { + Pair prevSample = entry.getValue(); + Pair sample = samples.get(entry.getKey()); + + count += sample.getSecond() - prevSample.getSecond(); + } + + return count; + } + } + + public static void fetchData() { + double totalIngestRate = 0.; + double totalIngestByteRate = 0.; + double totalQueryRate = 0.; + double totalQueryByteRate = 0.; + double totalScanRate = 0.; + long totalEntries = 0; + int totalTabletCount = 0; + int onlineTabletCount = 0; + long totalHoldTime = 0; + long totalLookups = 0; + boolean retry = true; + + // only recalc every so often + long currentTime = System.currentTimeMillis(); + if (currentTime - lastRecalc < REFRESH_TIME * 1000) + return; + + synchronized (Monitor.class) { + if (fetching) + return; + fetching = true; + } + + try { + while (retry) { + MasterClientService.Iface client = null; + try { + client = MasterClient.getConnection(HdfsZooInstance.getInstance()); + if (client != null) { + mmi = client.getMasterStats(Tracer.traceInfo(), SystemCredentials.get().toThrift(HdfsZooInstance.getInstance())); + retry = false; + } else { + mmi = null; + } + Monitor.gcStatus = fetchGcStatus(); + } catch (Exception e) { + mmi = null; + log.info("Error fetching stats: " + e); + } finally { + if (client != null) { + MasterClient.close(client); + } + } + if (mmi == null) + UtilWaitThread.sleep(1000); + } + if (mmi != null) { + int majorCompactions = 0; + int minorCompactions = 0; + + lookupRateTracker.startingUpdates(); + indexCacheHitTracker.startingUpdates(); + indexCacheRequestTracker.startingUpdates(); + dataCacheHitTracker.startingUpdates(); + dataCacheRequestTracker.startingUpdates(); + + for (TabletServerStatus server : mmi.tServerInfo) { + TableInfo summary = TableInfoUtil.summarizeTableStats(server); + totalIngestRate += summary.ingestRate; + totalIngestByteRate += summary.ingestByteRate; + totalQueryRate += summary.queryRate; + totalScanRate += summary.scanRate; + totalQueryByteRate += summary.queryByteRate; + totalEntries += summary.recs; + totalHoldTime += server.holdTime; + totalLookups += server.lookups; + majorCompactions += summary.majors.running; + minorCompactions += summary.minors.running; + lookupRateTracker.updateTabletServer(server.name, server.lastContact, server.lookups); + indexCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheHits); + indexCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.indexCacheRequest); + dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits); + dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest); + } + + lookupRateTracker.finishedUpdating(); + indexCacheHitTracker.finishedUpdating(); + indexCacheRequestTracker.finishedUpdating(); + dataCacheHitTracker.finishedUpdating(); + dataCacheRequestTracker.finishedUpdating(); + + int totalTables = 0; + for (TableInfo tInfo : mmi.tableMap.values()) { + totalTabletCount += tInfo.tablets; + onlineTabletCount += tInfo.onlineTablets; + totalTables++; + } + Monitor.totalIngestRate = totalIngestRate; + Monitor.totalTables = totalTables; + totalIngestByteRate = totalIngestByteRate / 1000000.0; + Monitor.totalIngestByteRate = totalIngestByteRate; + Monitor.totalQueryRate = totalQueryRate; + Monitor.totalScanRate = totalScanRate; + totalQueryByteRate = totalQueryByteRate / 1000000.0; + Monitor.totalQueryByteRate = totalQueryByteRate; + Monitor.totalEntries = totalEntries; + Monitor.totalTabletCount = totalTabletCount; + Monitor.onlineTabletCount = onlineTabletCount; + Monitor.totalHoldTime = totalHoldTime; + Monitor.totalLookups = totalLookups; + + ingestRateOverTime.add(new Pair(currentTime, totalIngestRate)); + ingestByteRateOverTime.add(new Pair(currentTime, totalIngestByteRate)); + + double totalLoad = 0.; + for (TabletServerStatus status : mmi.tServerInfo) { + if (status != null) + totalLoad += status.osLoad; + } + loadOverTime.add(new Pair(currentTime, totalLoad)); + + minorCompactionsOverTime.add(new Pair(currentTime, minorCompactions)); + majorCompactionsOverTime.add(new Pair(currentTime, majorCompactions)); + + lookupsOverTime.add(new Pair(currentTime, lookupRateTracker.calculateRate())); + + queryRateOverTime.add(new Pair(currentTime, (int) totalQueryRate)); + queryByteRateOverTime.add(new Pair(currentTime, totalQueryByteRate)); + + scanRateOverTime.add(new Pair(currentTime, (int) totalScanRate)); + + calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker); + calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker); + } + try { + Monitor.problemSummary = ProblemReports.getInstance().summarize(); + Monitor.problemException = null; + } catch (Exception e) { + log.info("Failed to obtain problem reports ", e); + Monitor.problemSummary = Collections.emptyMap(); + Monitor.problemException = e; + } + + } finally { + synchronized (Monitor.class) { + fetching = false; + lastRecalc = currentTime; + } + } + } + + private static void calcCacheHitRate(List> hitRate, long currentTime, EventCounter cacheHits, EventCounter cacheReq) { + long req = cacheReq.calculateCount(); + if (req > 0) + hitRate.add(new Pair(currentTime, cacheHits.calculateCount() / (double) cacheReq.calculateCount())); + else + hitRate.add(new Pair(currentTime, null)); + } + + private static GCStatus fetchGcStatus() { + GCStatus result = null; + HostAndPort address = null; + try { + // Read the gc location from its lock + Instance instance = HdfsZooInstance.getInstance(); + String zooKeepers = instance.getZooKeepers(); + log.debug("connecting to zookeepers " + zooKeepers); + ZooKeeper zk = new ZooKeeper(zooKeepers, (int) config.getConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), new Watcher() { + @Override + public void process(WatchedEvent event) {} + }); + try { + String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZGC_LOCK; + List locks = zk.getChildren(path, null); + if (locks != null && locks.size() > 0) { + Collections.sort(locks); + address = new ServerServices(new String(zk.getData(path + "/" + locks.get(0), null, null))).getAddress(Service.GC_CLIENT); + GCMonitorService.Client client = ThriftUtil.getClient(new GCMonitorService.Client.Factory(), address, config.getConfiguration()); + try { + result = client.getStatus(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance)); + } finally { + ThriftUtil.returnClient(client); + } + } + } finally { + zk.close(); + } + } catch (Exception ex) { + log.warn("Unable to contact the garbage collector at " + address, ex); + } + return result; + } + + public static void main(String[] args) throws Exception { + SecurityUtil.serverLogin(); + + VolumeManager fs = VolumeManagerImpl.get(); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("monitor", args); + String hostname = opts.getAddress(); + instance = HdfsZooInstance.getInstance(); + config = new ServerConfiguration(instance); + Accumulo.init(fs, config, "monitor"); + Monitor monitor = new Monitor(); + Accumulo.enableTracing(hostname, "monitor"); + monitor.run(hostname); + } + + private static long START_TIME; + + public void run(String hostname) { + Monitor.START_TIME = System.currentTimeMillis(); + int port = config.getConfiguration().getPort(Property.MONITOR_PORT); + try { + log.debug("Creating monitor on port " + port); + server = new EmbeddedWebServer(hostname, port); + } catch (Throwable ex) { + log.error("Unable to start embedded web server", ex); + throw new RuntimeException(ex); + } + + server.addServlet(DefaultServlet.class, "/"); + server.addServlet(OperationServlet.class, "/op"); + server.addServlet(MasterServlet.class, "/master"); + server.addServlet(TablesServlet.class, "/tables"); + server.addServlet(TServersServlet.class, "/tservers"); + server.addServlet(ProblemServlet.class, "/problems"); + server.addServlet(GcStatusServlet.class, "/gc"); + server.addServlet(LogServlet.class, "/log"); + server.addServlet(XMLServlet.class, "/xml"); + server.addServlet(JSONServlet.class, "/json"); + server.addServlet(VisServlet.class, "/vis"); + server.addServlet(Summary.class, "/trace/summary"); + server.addServlet(ListType.class, "/trace/listType"); + server.addServlet(ShowTrace.class, "/trace/show"); + if (server.isUsingSsl()) + server.addServlet(ShellServlet.class, "/shell"); + server.start(); + + try { + String monitorAddress = HostAndPort.fromParts(hostname, server.getPort()).toString(); + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(), + NodeExistsPolicy.OVERWRITE); + log.info("Set monitor address in zookeeper to " + monitorAddress); + } catch (Exception ex) { + log.error("Unable to set monitor address in zookeeper"); + } + LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID()); + + new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start(); + + // need to regularly fetch data so plot data is updated + new Daemon(new LoggingRunnable(log, new Runnable() { + + @Override + public void run() { + while (true) { + try { + Monitor.fetchData(); + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + + UtilWaitThread.sleep(333); + } + + } + }), "Data fetcher").start(); + } + + public static MasterMonitorInfo getMmi() { + return mmi; + } + + public static int getTotalTables() { + return totalTables; + } + + public static int getTotalTabletCount() { + return totalTabletCount; + } + + public static int getOnlineTabletCount() { + return onlineTabletCount; + } + + public static long getTotalEntries() { + return totalEntries; + } + + public static double getTotalIngestRate() { + return totalIngestRate; + } + + public static double getTotalIngestByteRate() { + return totalIngestByteRate; + } + + public static double getTotalQueryRate() { + return totalQueryRate; + } + + public static double getTotalScanRate() { + return totalScanRate; + } + + public static double getTotalQueryByteRate() { + return totalQueryByteRate; + } + + public static long getTotalHoldTime() { + return totalHoldTime; + } + + public static Exception getProblemException() { + return problemException; + } + + public static Map> getProblemSummary() { + return problemSummary; + } + + public static GCStatus getGcStatus() { + return gcStatus; + } + + public static long getTotalLookups() { + return totalLookups; + } + + public static long getStartTime() { + return START_TIME; + } + + public static List> getLoadOverTime() { + synchronized (loadOverTime) { + return new ArrayList>(loadOverTime); + } + } + + public static List> getIngestRateOverTime() { + synchronized (ingestRateOverTime) { + return new ArrayList>(ingestRateOverTime); + } + } + + public static List> getIngestByteRateOverTime() { + synchronized (ingestByteRateOverTime) { + return new ArrayList>(ingestByteRateOverTime); + } + } + + public static List> getRecoveriesOverTime() { + synchronized (recoveriesOverTime) { + return new ArrayList>(recoveriesOverTime); + } + } + + public static List> getMinorCompactionsOverTime() { + synchronized (minorCompactionsOverTime) { + return new ArrayList>(minorCompactionsOverTime); + } + } + + public static List> getMajorCompactionsOverTime() { + synchronized (majorCompactionsOverTime) { + return new ArrayList>(majorCompactionsOverTime); + } + } + + public static List> getLookupsOverTime() { + synchronized (lookupsOverTime) { + return new ArrayList>(lookupsOverTime); + } + } + + public static double getLookupRate() { + return lookupRateTracker.calculateRate(); + } + + public static List> getQueryRateOverTime() { + synchronized (queryRateOverTime) { + return new ArrayList>(queryRateOverTime); + } + } + + public static List> getScanRateOverTime() { + synchronized (scanRateOverTime) { + return new ArrayList>(scanRateOverTime); + } + } + + public static List> getQueryByteRateOverTime() { + synchronized (queryByteRateOverTime) { + return new ArrayList>(queryByteRateOverTime); + } + } + + public static List> getIndexCacheHitRateOverTime() { + synchronized (indexCacheHitRateOverTime) { + return new ArrayList>(indexCacheHitRateOverTime); + } + } + + public static List> getDataCacheHitRateOverTime() { + synchronized (dataCacheHitRateOverTime) { + return new ArrayList>(dataCacheHitRateOverTime); + } + } + + public static AccumuloConfiguration getSystemConfiguration() { + return config.getConfiguration(); + } + + public static Instance getInstance() { + return instance; + } + + public static boolean isUsingSsl() { + return server.isUsingSsl(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/598821cd/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java new file mode 100644 index 0000000..a4a8911 --- /dev/null +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/ZooKeeperStatus.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor; + +import java.util.Collection; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.TTimeoutTransport; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.net.HostAndPort; + +public class ZooKeeperStatus implements Runnable { + + private static final Logger log = Logger.getLogger(ZooKeeperStatus.class); + + private volatile boolean stop = false; + + public static class ZooKeeperState implements Comparable { + public final String keeper; + public final String mode; + public final int clients; + + public ZooKeeperState(String keeper, String mode, int clients) { + this.keeper = keeper; + this.mode = mode; + this.clients = clients; + } + + @Override + public int compareTo(ZooKeeperState other) { + if (this == other) { + return 0; + } else if (other == null) { + return 1; + } else { + if (this.keeper == other.keeper) { + return 0; + } else if (null == this.keeper) { + return -1; + } else if (null == other.keeper) { + return 1; + } else { + return this.keeper.compareTo(other.keeper); + } + } + } + } + + private static SortedSet status = new TreeSet(); + + public static Collection getZooKeeperStatus() { + return status; + } + + public void stop() { + this.stop = true; + } + + @Override + public void run() { + + while (!stop) { + + TreeSet update = new TreeSet(); + + String zookeepers[] = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST).split(","); + for (String keeper : zookeepers) { + int clients = 0; + String mode = "unknown"; + + String[] parts = keeper.split(":"); + TTransport transport = null; + try { + HostAndPort addr; + if (parts.length > 1) + addr = HostAndPort.fromParts(parts[0], Integer.parseInt(parts[1])); + else + addr = HostAndPort.fromParts(parts[0], 2181); + + transport = TTimeoutTransport.create(addr, 10 * 1000l); + transport.write("stat\n".getBytes(), 0, 5); + StringBuilder response = new StringBuilder(); + try { + transport.flush(); + byte[] buffer = new byte[1024 * 100]; + int n = 0; + while ((n = transport.read(buffer, 0, buffer.length)) > 0) { + response.append(new String(buffer, 0, n)); + } + } catch (TTransportException ex) { + // happens at EOF + } + for (String line : response.toString().split("\n")) { + if (line.startsWith(" ")) + clients++; + if (line.startsWith("Mode")) + mode = line.split(":")[1]; + } + update.add(new ZooKeeperState(keeper, mode, clients)); + } catch (Exception ex) { + log.info("Exception talking to zookeeper " + keeper, ex); + update.add(new ZooKeeperState(keeper, "Down", -1)); + } finally { + if (transport != null) { + try { + transport.close(); + } catch (Exception ex) { + log.error(ex, ex); + } + } + } + } + status = update; + UtilWaitThread.sleep(1000); + } + } + +}