Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E633E10528 for ; Tue, 11 Feb 2014 18:42:08 +0000 (UTC) Received: (qmail 35804 invoked by uid 500); 11 Feb 2014 18:41:45 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 35621 invoked by uid 500); 11 Feb 2014 18:41:43 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 35590 invoked by uid 99); 11 Feb 2014 18:41:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Feb 2014 18:41:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7861392433C; Tue, 11 Feb 2014 18:41:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Tue, 11 Feb 2014 18:41:42 -0000 Message-Id: <144ab4fa07f4422c9c2402604adb6d1a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/16] git commit: ACCUMULO-2334 Remove ACCUMULO_LOG_HOST in favor of pull host and port log-forwarding from zookeeper Updated Branches: refs/heads/1.5.1-SNAPSHOT 24c044505 -> fafa42961 refs/heads/1.6.0-SNAPSHOT 2d42417d7 -> f9a196f10 refs/heads/master a504b16ad -> 0a0be0f6b ACCUMULO-2334 Remove ACCUMULO_LOG_HOST in favor of pull host and port log-forwarding from zookeeper Advertising both the host and port for log4j gives us a couple of benefits. We can do away with ACCUMULO_LOG_HOST, simplify the code to always do the same thing (pull from zookeeper), and gain robust failover if the monitor is moved to a different host or is restarted with a random port (does not require any other service restart to become aware). The monitor will now acquire a zoolock before starting, which ensures that all tservers will perform log-forwarding to the correct monitor (in the case that there were multiple for some reason). Creates a better hierarchy in ZooKeeper for all monitor data (HTTP and Log4j advertisement). Update `accumulo init`, `accumulo info` and ensure that the zookeeper layout can handle an "upgrade" from 1.5.0. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0351d0d4 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0351d0d4 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0351d0d4 Branch: refs/heads/1.5.1-SNAPSHOT Commit: 0351d0d416df51f0b10d91acdc074dced36e40d4 Parents: 24c0445 Author: Josh Elser Authored: Mon Feb 10 23:28:31 2014 -0500 Committer: Josh Elser Committed: Mon Feb 10 23:43:21 2014 -0500 ---------------------------------------------------------------------- .../1GB/native-standalone/accumulo-env.sh | 1 - conf/examples/1GB/standalone/accumulo-env.sh | 1 - .../2GB/native-standalone/accumulo-env.sh | 1 - conf/examples/2GB/standalone/accumulo-env.sh | 1 - .../3GB/native-standalone/accumulo-env.sh | 1 - conf/examples/3GB/standalone/accumulo-env.sh | 1 - .../512MB/native-standalone/accumulo-env.sh | 1 - conf/examples/512MB/standalone/accumulo-env.sh | 1 - .../org/apache/accumulo/core/Constants.java | 4 +- .../org/apache/accumulo/core/conf/Property.java | 1 + .../apache/accumulo/core/util/AddressUtil.java | 4 + .../org/apache/accumulo/server/Accumulo.java | 76 +---- .../accumulo/server/monitor/LogService.java | 81 +++-- .../apache/accumulo/server/monitor/Monitor.java | 326 +++++++++++++------ .../org/apache/accumulo/server/util/Info.java | 2 +- .../apache/accumulo/server/util/Initialize.java | 2 + .../server/watcher/MonitorLog4jWatcher.java | 154 +++++++++ 17 files changed, 454 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/1GB/native-standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/1GB/native-standalone/accumulo-env.sh b/conf/examples/1GB/native-standalone/accumulo-env.sh index aa4a1d0..868a665 100755 --- a/conf/examples/1GB/native-standalone/accumulo-env.sh +++ b/conf/examples/1GB/native-standalone/accumulo-env.sh @@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/1GB/standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/1GB/standalone/accumulo-env.sh b/conf/examples/1GB/standalone/accumulo-env.sh index 1707f3d..e25b028 100755 --- a/conf/examples/1GB/standalone/accumulo-env.sh +++ b/conf/examples/1GB/standalone/accumulo-env.sh @@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/2GB/native-standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/2GB/native-standalone/accumulo-env.sh b/conf/examples/2GB/native-standalone/accumulo-env.sh index ef74ca7..848fb91 100755 --- a/conf/examples/2GB/native-standalone/accumulo-env.sh +++ b/conf/examples/2GB/native-standalone/accumulo-env.sh @@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx128m -Xms128m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx256m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/2GB/standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/2GB/standalone/accumulo-env.sh b/conf/examples/2GB/standalone/accumulo-env.sh index 75014c2..74ddee5 100755 --- a/conf/examples/2GB/standalone/accumulo-env.sh +++ b/conf/examples/2GB/standalone/accumulo-env.sh @@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx128m -Xms128m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx256m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/3GB/native-standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/3GB/native-standalone/accumulo-env.sh b/conf/examples/3GB/native-standalone/accumulo-env.sh index ae0da11..9fe07e7 100755 --- a/conf/examples/3GB/native-standalone/accumulo-env.sh +++ b/conf/examples/3GB/native-standalone/accumulo-env.sh @@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx256m -Xms256m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx1g -Xms256m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/3GB/standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/3GB/standalone/accumulo-env.sh b/conf/examples/3GB/standalone/accumulo-env.sh index 7edd938..7a0992c 100755 --- a/conf/examples/3GB/standalone/accumulo-env.sh +++ b/conf/examples/3GB/standalone/accumulo-env.sh @@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx256m -Xms256m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx1g -Xms256m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/512MB/native-standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/512MB/native-standalone/accumulo-env.sh b/conf/examples/512MB/native-standalone/accumulo-env.sh index 749a678..2e15473 100755 --- a/conf/examples/512MB/native-standalone/accumulo-env.sh +++ b/conf/examples/512MB/native-standalone/accumulo-env.sh @@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/512MB/standalone/accumulo-env.sh ---------------------------------------------------------------------- diff --git a/conf/examples/512MB/standalone/accumulo-env.sh b/conf/examples/512MB/standalone/accumulo-env.sh index 9beb059..45d64ef 100755 --- a/conf/examples/512MB/standalone/accumulo-env.sh +++ b/conf/examples/512MB/standalone/accumulo-env.sh @@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx test -z "$ACCUMULO_GC_OPTS" && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m" test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true" test -z "$ACCUMULO_OTHER_OPTS" && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m" -export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1` # what do when the JVM runs out of heap memory export ACCUMULO_KILL_CMD='kill -9 %p' http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/Constants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 9bb3419..095319e 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -64,7 +64,9 @@ public class Constants { public static final String ZGC_LOCK = ZGC + "/lock"; public static final String ZMONITOR = "/monitor"; - public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port"; + public static final String ZMONITOR_LOCK = ZMONITOR + "/lock"; + public static final String ZMONITOR_HTTP_ADDR = ZMONITOR + "/http_addr"; + public static final String ZMONITOR_LOG4J_ADDR = ZMONITOR + "/log4j_addr"; public static final String ZCONFIG = "/config"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 424b737..1c2dfdb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -232,6 +232,7 @@ public enum Property { MONITOR_SSL_KEYSTOREPASS("monitor.ssl.keyStorePassword", "", PropertyType.STRING, "The keystore password for enabling monitor SSL.", true, false), MONITOR_SSL_TRUSTSTORE("monitor.ssl.trustStore", "", PropertyType.PATH, "The truststore for enabling monitor SSL.", true, false), MONITOR_SSL_TRUSTSTOREPASS("monitor.ssl.trustStorePassword", "", PropertyType.STRING, "The truststore password for enabling monitor SSL.", true, false), + MONITOR_LOCK_CHECK_INTERVAL("monitor.lock.check.interval", "5s", PropertyType.TIMEDURATION, "The amount of time to sleep between checking for the Montior ZooKeeper lock"), TRACE_PREFIX("trace.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of distributed tracing."), TRACE_PORT("trace.port.client", "12234", PropertyType.PORT, "The listening port for the trace server"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java index af9a1a6..5fcf618 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java @@ -44,6 +44,10 @@ public class AddressUtil extends org.apache.accumulo.fate.util.AddressUtil { return new TSocket(addr.getHostName(), addr.getPort()); } + static public String getHostAddress(InetSocketAddress addr) { + return addr.getAddress().getHostAddress(); + } + static public String toString(InetSocketAddress addr) { return addr.getAddress().getHostAddress() + ":" + addr.getPort(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java index f4da33b..99ec7e4 100644 --- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -32,23 +32,18 @@ import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.Version; -import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.watcher.MonitorLog4jWatcher; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.log4j.helpers.FileWatchdog; import org.apache.log4j.helpers.LogLog; -import org.apache.log4j.xml.DOMConfigurator; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; public class Accumulo { @@ -87,58 +82,7 @@ public class Accumulo { log.error("creating remote sink for trace spans", ex); } } - - private static class LogMonitor extends FileWatchdog implements Watcher { - String path; - - protected LogMonitor(String instance, String filename, int delay) { - super(filename); - setDelay(delay); - this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT; - } - - private void setMonitorPort() { - try { - String port = new String(ZooReaderWriter.getInstance().getData(path, null), Constants.UTF8); - System.setProperty("org.apache.accumulo.core.host.log.port", port); - log.info("Changing monitor log4j port to "+port); - doOnChange(); - } catch (Exception e) { - log.error("Error reading zookeeper data for monitor log4j port", e); - } - } - - @Override - public void run() { - try { - if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null) - setMonitorPort(); - log.info("Set watch for monitor log4j port"); - } catch (Exception e) { - log.error("Unable to set watch for monitor log4j port " + path); - } - super.run(); - } - - @Override - protected void doOnChange() { - LogManager.resetConfiguration(); - new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository()); - } - - @Override - public void process(WatchedEvent event) { - setMonitorPort(); - if (event.getPath() != null) { - try { - ZooReaderWriter.getInstance().exists(event.getPath(), this); - } catch (Exception ex) { - log.error("Unable to reset watch for monitor log4j port", ex); - } - } - } - } - + public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException { System.setProperty("org.apache.accumulo.core.application", application); @@ -151,11 +95,6 @@ public class Accumulo { String localhost = InetAddress.getLocalHost().getHostName(); System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost); - if (System.getenv("ACCUMULO_LOG_HOST") != null) - System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST")); - else - System.setProperty("org.apache.accumulo.core.host.log", localhost); - int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT); System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort)); @@ -167,13 +106,10 @@ public class Accumulo { } // Turn off messages about not being able to reach the remote logger... we protect against that. LogLog.setQuietMode(true); - - // Configure logging - if (logPort==0) - new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start(); - else - DOMConfigurator.configureAndWatch(logConfig, 5000); - + + // Configure logging using information advertised in zookeeper by the monitor + new MonitorLog4jWatcher(config.getInstance().getInstanceID(), logConfig, 5000).start(); + log.info(application + " starting"); log.info("Instance " + config.getInstance().getInstanceID()); int dataVersion = Accumulo.getAccumuloPersistentVersion(fs); http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java index 10ef9e4..34e88b6 100644 --- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java +++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java @@ -29,28 +29,28 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.net.SocketNode; import org.apache.log4j.spi.LoggingEvent; +import org.apache.zookeeper.KeeperException; /** * Hijack log4j and capture log events for display. * */ public class LogService extends org.apache.log4j.AppenderSkeleton { - + private static final Logger log = Logger.getLogger(LogService.class); - + /** * Read logging events forward to us over the net. * */ static class SocketServer implements Runnable { private ServerSocket server = null; - + public SocketServer(int port) { try { server = new ServerSocket(port); @@ -58,11 +58,11 @@ public class LogService extends org.apache.log4j.AppenderSkeleton { throw new RuntimeException(io); } } - + public int getLocalPort() { return server.getLocalPort(); } - + public void run() { try { while (true) { @@ -77,53 +77,82 @@ public class LogService extends org.apache.log4j.AppenderSkeleton { } } } - - static void startLogListener(AccumuloConfiguration conf, String instanceId) { + + /** + * Place the host:port advertisement for the Monitor's Log4j listener in ZooKeeper + * + * @param conf + * configuration for the instance + * @param instanceId + * instanceId for the instance + * @param hostAddress + * Address that monitor process is bound to + */ + static void startLogListener(AccumuloConfiguration conf, String instanceId, String hostAddress) { try { SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT)); - ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT, - Integer.toString(server.getLocalPort()).getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE); + + // getLocalPort will return the actual ephemeral port used when '0' was provided. + String logForwardingAddr = hostAddress + ":" + server.getLocalPort(); + + log.debug("Setting monitor log4j log-forwarding address to: " + logForwardingAddr); + + final String path = ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_ADDR; + final ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + + // Delete before we try to re-create in case the previous session hasn't yet expired + try { + zoo.delete(path, -1); + } catch (KeeperException e) { + // We don't care if the node is already gone + if (!KeeperException.Code.NONODE.equals(e.code())) { + throw e; + } + } + + zoo.putEphemeralData(path, logForwardingAddr.getBytes(Constants.UTF8)); + new Daemon(server).start(); } catch (Throwable t) { - log.info("Unable to listen to cluster-wide ports", t); + log.info("Unable to start/advertise Log4j listener for log-forwarding to monitor", t); } } - + static private LogService instance = null; - + synchronized public static LogService getInstance() { if (instance == null) return new LogService(); return instance; } - + private static final int MAX_LOGS = 50; - + private LinkedHashMap events = new LinkedHashMap(MAX_LOGS + 1, (float) .75, true) { - + private static final long serialVersionUID = 1L; - + @Override @SuppressWarnings("rawtypes") protected boolean removeEldestEntry(Map.Entry eldest) { return size() > MAX_LOGS; } }; - + public LogService() { synchronized (LogService.class) { instance = this; } } - + @Override synchronized protected void append(LoggingEvent ev) { Object application = ev.getMDC("application"); if (application == null || application.toString().isEmpty()) return; - + DedupedLogEvent dev = new DedupedLogEvent(ev); - + // if event is present, increase the count if (events.containsKey(dev.toString())) { DedupedLogEvent oldDev = events.remove(dev.toString()); @@ -131,26 +160,26 @@ public class LogService extends org.apache.log4j.AppenderSkeleton { } events.put(dev.toString(), dev); } - + @Override public void close() { events = null; } - + @Override public synchronized void doAppend(LoggingEvent event) { super.doAppend(event); } - + @Override public boolean requiresLayout() { return false; } - + synchronized public List getEvents() { return new ArrayList(events.values()); } - + synchronized public void clear() { events.clear(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java index dcb80fd..b33d9e3 100644 --- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java +++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java @@ -51,7 +51,9 @@ import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -74,17 +76,20 @@ import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.util.EmbeddedWebServer; +import org.apache.accumulo.server.util.Halt; +import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.accumulo.trace.instrument.Tracer; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; /** * Serve master statistics with an embedded web server. */ public class Monitor { private static final Logger log = Logger.getLogger(Monitor.class); - + public static final int REFRESH_TIME = 5; private static long lastRecalc = 0L; private static double totalIngestRate = 0.0; @@ -98,28 +103,28 @@ public class Monitor { private static long totalHoldTime = 0; private static long totalLookups = 0; private static int totalTables = 0; - + private static class MaxList extends LinkedList> { private static final long serialVersionUID = 1L; - + private long maxDelta; - + public MaxList(long maxDelta) { this.maxDelta = maxDelta; } - + @Override public boolean add(Pair obj) { boolean result = super.add(obj); - + if (obj.getFirst() - get(0).getFirst() > maxDelta) remove(0); - + return result; } - + } - + private static final int MAX_TIME_PERIOD = 60 * 60 * 1000; private static final List> loadOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); private static final List> ingestRateOverTime = Collections.synchronizedList(new MaxList(MAX_TIME_PERIOD)); @@ -138,19 +143,21 @@ public class Monitor { private static EventCounter indexCacheRequestTracker = new EventCounter(); private static EventCounter dataCacheHitTracker = new EventCounter(); private static EventCounter dataCacheRequestTracker = new EventCounter(); - + private static volatile boolean fetching = false; private static MasterMonitorInfo mmi; private static Map> problemSummary = Collections.emptyMap(); private static Exception problemException; private static GCStatus gcStatus; - + private static Instance instance; - + private static ServerConfiguration config; - + private static EmbeddedWebServer server; - + + private ZooLock monitorLock; + public static Map summarizeTableStats(MasterMonitorInfo mmi) { Map compactingByTable = new HashMap(); if (mmi != null && mmi.tServerInfo != null) { @@ -165,7 +172,7 @@ public class Monitor { } return compactingByTable; } - + public static void add(TableInfo total, TableInfo more) { if (total.minors == null) total.minors = new Compacting(); @@ -195,7 +202,7 @@ public class Monitor { total.queryByteRate += more.queryByteRate; total.scanRate += more.scanRate; } - + public static TableInfo summarizeTableStats(TabletServerStatus status) { TableInfo summary = new TableInfo(); summary.majors = new Compacting(); @@ -206,21 +213,21 @@ public class Monitor { } return summary; } - + private static class EventCounter { - + Map> prevSamples = new HashMap>(); Map> samples = new HashMap>(); Set serversUpdated = new HashSet(); - + void startingUpdates() { serversUpdated.clear(); } - + void updateTabletServer(String name, long sampleTime, long numEvents) { Pair newSample = new Pair(sampleTime, numEvents); Pair lastSample = samples.get(name); - + if (lastSample == null || !lastSample.equals(newSample)) { samples.put(name, newSample); if (lastSample != null) { @@ -229,40 +236,40 @@ public class Monitor { } serversUpdated.add(name); } - + void finishedUpdating() { // remove any tablet servers not updated samples.keySet().retainAll(serversUpdated); prevSamples.keySet().retainAll(serversUpdated); } - + double calculateRate() { double totalRate = 0; - + for (Entry> entry : prevSamples.entrySet()) { Pair prevSample = entry.getValue(); Pair sample = samples.get(entry.getKey()); - + totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst() - prevSample.getFirst()) / (double) 1000); } - + return totalRate; } - + long calculateCount() { long count = 0; - + for (Entry> entry : prevSamples.entrySet()) { Pair prevSample = entry.getValue(); Pair sample = samples.get(entry.getKey()); - + count += sample.getSecond() - prevSample.getSecond(); } - + return count; } } - + public static void fetchData() { double totalIngestRate = 0.; double totalIngestByteRate = 0.; @@ -275,18 +282,18 @@ public class Monitor { long totalHoldTime = 0; long totalLookups = 0; boolean retry = true; - + // only recalc every so often long currentTime = System.currentTimeMillis(); if (currentTime - lastRecalc < REFRESH_TIME * 1000) return; - + synchronized (Monitor.class) { if (fetching) return; fetching = true; } - + try { while (retry) { MasterClientService.Iface client = null; @@ -313,13 +320,13 @@ public class Monitor { if (mmi != null) { int majorCompactions = 0; int minorCompactions = 0; - + lookupRateTracker.startingUpdates(); indexCacheHitTracker.startingUpdates(); indexCacheRequestTracker.startingUpdates(); dataCacheHitTracker.startingUpdates(); dataCacheRequestTracker.startingUpdates(); - + for (TabletServerStatus server : mmi.tServerInfo) { TableInfo summary = Monitor.summarizeTableStats(server); totalIngestRate += summary.ingestRate; @@ -338,13 +345,13 @@ public class Monitor { dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits); dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest); } - + lookupRateTracker.finishedUpdating(); indexCacheHitTracker.finishedUpdating(); indexCacheRequestTracker.finishedUpdating(); dataCacheHitTracker.finishedUpdating(); dataCacheRequestTracker.finishedUpdating(); - + int totalTables = 0; for (TableInfo tInfo : mmi.tableMap.values()) { totalTabletCount += tInfo.tablets; @@ -364,27 +371,27 @@ public class Monitor { Monitor.onlineTabletCount = onlineTabletCount; Monitor.totalHoldTime = totalHoldTime; Monitor.totalLookups = totalLookups; - + ingestRateOverTime.add(new Pair(currentTime, totalIngestRate)); ingestByteRateOverTime.add(new Pair(currentTime, totalIngestByteRate)); - + double totalLoad = 0.; for (TabletServerStatus status : mmi.tServerInfo) { if (status != null) totalLoad += status.osLoad; } loadOverTime.add(new Pair(currentTime, totalLoad)); - + minorCompactionsOverTime.add(new Pair(currentTime, minorCompactions)); majorCompactionsOverTime.add(new Pair(currentTime, majorCompactions)); - + lookupsOverTime.add(new Pair(currentTime, lookupRateTracker.calculateRate())); - + queryRateOverTime.add(new Pair(currentTime, (int) totalQueryRate)); queryByteRateOverTime.add(new Pair(currentTime, totalQueryByteRate)); - + scanRateOverTime.add(new Pair(currentTime, (int) totalScanRate)); - + calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker); calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker); } @@ -396,7 +403,7 @@ public class Monitor { Monitor.problemSummary = Collections.emptyMap(); Monitor.problemException = e; } - + } finally { synchronized (Monitor.class) { fetching = false; @@ -404,7 +411,7 @@ public class Monitor { } } } - + private static void calcCacheHitRate(List> hitRate, long currentTime, EventCounter cacheHits, EventCounter cacheReq) { long req = cacheReq.calculateCount(); if (req > 0) @@ -412,7 +419,7 @@ public class Monitor { else hitRate.add(new Pair(currentTime, null)); } - + private static GCStatus fetchGcStatus() { GCStatus result = null; InetSocketAddress address = null; @@ -437,10 +444,10 @@ public class Monitor { } return result; } - + public static void main(String[] args) throws Exception { SecurityUtil.serverLogin(); - + FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()); String hostname = Accumulo.getLocalAddress(args); instance = HdfsZooInstance.getInstance(); @@ -450,10 +457,17 @@ public class Monitor { Accumulo.enableTracing(hostname, "monitor"); monitor.run(hostname); } - + private static long START_TIME; - + public void run(String hostname) { + try { + getMonitorLock(); + } catch (Exception e) { + log.error("Failed to get Monitor ZooKeeper lock"); + throw new RuntimeException(e); + } + Monitor.START_TIME = System.currentTimeMillis(); int port = config.getConfiguration().getPort(Property.MONITOR_PORT); try { @@ -463,7 +477,7 @@ public class Monitor { log.error("Unable to start embedded web server", ex); throw new RuntimeException(ex); } - + server.addServlet(DefaultServlet.class, "/"); server.addServlet(OperationServlet.class, "/op"); server.addServlet(MasterServlet.class, "/master"); @@ -481,27 +495,34 @@ public class Monitor { if (server.isUsingSsl()) server.addServlet(ShellServlet.class, "/shell"); server.start(); - - + + InetSocketAddress monitorAddress = null; try { hostname = InetAddress.getLocalHost().getHostName(); - + log.debug("Using " + hostname + " to advertise monitor location in ZooKeeper"); - - String monitorAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, server.getPort())); - ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(Constants.UTF8), + + monitorAddress = new InetSocketAddress(hostname, server.getPort()); + String monitorAddressAndPort = org.apache.accumulo.core.util.AddressUtil.toString(monitorAddress); + ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR, monitorAddressAndPort.getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE); log.info("Set monitor address in zookeeper to " + monitorAddress); } catch (Exception ex) { - log.error("Unable to set monitor address in zookeeper"); + log.error("Unable to set monitor HTTP address in zookeeper", ex); + } + + if (null != monitorAddress) { + LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID(), + org.apache.accumulo.core.util.AddressUtil.getHostAddress(monitorAddress)); + } else { + log.warn("Not starting log4j listener as we could not determine address to use"); } - LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID()); - + new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start(); - + // need to regularly fetch data so plot data is updated new Daemon(new LoggingRunnable(log, new Runnable() { - + @Override public void run() { while (true) { @@ -510,162 +531,271 @@ public class Monitor { } catch (Exception e) { log.warn(e.getMessage(), e); } - + UtilWaitThread.sleep(333); } - + } }), "Data fetcher").start(); } - + + /** + * Get the monitor lock in ZooKeeper + * + * @throws KeeperException + * @throws InterruptedException + */ + private void getMonitorLock() throws KeeperException, InterruptedException { + final String zRoot = ZooUtil.getRoot(instance); + final String monitorPath = zRoot + Constants.ZMONITOR; + final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK; + + // Ensure that everything is kosher with ZK as this has changed. + ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (zoo.exists(monitorPath)) { + byte[] data = zoo.getData(monitorPath, null); + // If the node isn't empty, it's from a previous install (has hostname:port for HTTP server) + if (0 != data.length) { + // Recursively delete from that parent node + zoo.recursiveDelete(monitorPath, NodeMissingPolicy.SKIP); + + // And then make the nodes that we expect for the incoming ephemeral nodes + zoo.putPersistentData(monitorPath, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL); + } + } else { + // 1.5.0 and earlier + zoo.putPersistentData(zRoot + Constants.ZMONITOR, new byte[0], NodeExistsPolicy.FAIL); + zoo.putPersistentData(zRoot + Constants.ZMONITOR_LOCK, new byte[0], NodeExistsPolicy.FAIL); + } + + // Get a ZooLock for the monitor + while (true) { + MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); + monitorLock = new ZooLock(monitorLockPath); + monitorLock.lockAsync(monitorLockWatcher, new byte[0]); + + monitorLockWatcher.waitForChange(); + + if (monitorLockWatcher.acquiredLock) { + break; + } + + if (!monitorLockWatcher.failedToAcquireLock) { + throw new IllegalStateException("monitor lock in unknown state"); + } + + monitorLock.tryToCancelAsyncLockOrUnlock(); + + UtilWaitThread.sleep(instance.getConfiguration().getTimeInMillis(Property.MONITOR_LOCK_CHECK_INTERVAL)); + } + + log.info("Got Monitor lock."); + } + + /** + * Async Watcher for monitor lock + */ + private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher { + + boolean acquiredLock = false; + boolean failedToAcquireLock = false; + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Throwable e) { + Halt.halt(-1, new Runnable() { + @Override + public void run() { + log.fatal("No longer able to monitor Monitor lock node", e); + } + }); + + } + + @Override + public synchronized void acquiredLock() { + if (acquiredLock || failedToAcquireLock) { + Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + acquiredLock = true; + notifyAll(); + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + log.warn("Failed to get monitor lock " + e); + + if (acquiredLock) { + Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1); + } + + failedToAcquireLock = true; + notifyAll(); + } + + public synchronized void waitForChange() { + while (!acquiredLock && !failedToAcquireLock) { + try { + wait(); + } catch (InterruptedException e) {} + } + } + } + public static MasterMonitorInfo getMmi() { return mmi; } - + public static int getTotalTables() { return totalTables; } - + public static int getTotalTabletCount() { return totalTabletCount; } - + public static int getOnlineTabletCount() { return onlineTabletCount; } - + public static long getTotalEntries() { return totalEntries; } - + public static double getTotalIngestRate() { return totalIngestRate; } - + public static double getTotalIngestByteRate() { return totalIngestByteRate; } - + public static double getTotalQueryRate() { return totalQueryRate; } - + public static double getTotalScanRate() { return totalScanRate; } - + public static double getTotalQueryByteRate() { return totalQueryByteRate; } - + public static long getTotalHoldTime() { return totalHoldTime; } - + public static Exception getProblemException() { return problemException; } - + public static Map> getProblemSummary() { return problemSummary; } - + public static GCStatus getGcStatus() { return gcStatus; } - + public static long getTotalLookups() { return totalLookups; } - + public static long getStartTime() { return START_TIME; } - + public static List> getLoadOverTime() { synchronized (loadOverTime) { return new ArrayList>(loadOverTime); } } - + public static List> getIngestRateOverTime() { synchronized (ingestRateOverTime) { return new ArrayList>(ingestRateOverTime); } } - + public static List> getIngestByteRateOverTime() { synchronized (ingestByteRateOverTime) { return new ArrayList>(ingestByteRateOverTime); } } - + public static List> getRecoveriesOverTime() { synchronized (recoveriesOverTime) { return new ArrayList>(recoveriesOverTime); } } - + public static List> getMinorCompactionsOverTime() { synchronized (minorCompactionsOverTime) { return new ArrayList>(minorCompactionsOverTime); } } - + public static List> getMajorCompactionsOverTime() { synchronized (majorCompactionsOverTime) { return new ArrayList>(majorCompactionsOverTime); } } - + public static List> getLookupsOverTime() { synchronized (lookupsOverTime) { return new ArrayList>(lookupsOverTime); } } - + public static double getLookupRate() { return lookupRateTracker.calculateRate(); } - + public static List> getQueryRateOverTime() { synchronized (queryRateOverTime) { return new ArrayList>(queryRateOverTime); } } - + public static List> getScanRateOverTime() { synchronized (scanRateOverTime) { return new ArrayList>(scanRateOverTime); } } - + public static List> getQueryByteRateOverTime() { synchronized (queryByteRateOverTime) { return new ArrayList>(queryByteRateOverTime); } } - + public static List> getIndexCacheHitRateOverTime() { synchronized (indexCacheHitRateOverTime) { return new ArrayList>(indexCacheHitRateOverTime); } } - + public static List> getDataCacheHitRateOverTime() { synchronized (dataCacheHitRateOverTime) { return new ArrayList>(dataCacheHitRateOverTime); } } - + public static AccumuloConfiguration getSystemConfiguration() { return config.getConfiguration(); } - + public static Instance getInstance() { return instance; } - + public static boolean isUsingSsl() { return server.isUsingSsl(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/util/Info.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java b/server/src/main/java/org/apache/accumulo/server/util/Info.java index 7a07107..c819ac1 100644 --- a/server/src/main/java/org/apache/accumulo/server/util/Info.java +++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java @@ -26,7 +26,7 @@ public class Info { public static void main(String[] args) throws Exception { ZooReaderWriter zrw = ZooReaderWriter.getInstance(); Instance instance = HdfsZooInstance.getInstance(); - System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, null), Constants.UTF8)); + System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR, null), Constants.UTF8)); System.out.println("masters: " + instance.getMasterLocations()); System.out.println("zookeepers: " + instance.getZooKeepers()); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/util/Initialize.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/Initialize.java b/server/src/main/java/org/apache/accumulo/server/util/Initialize.java index baa5400..ce26ff5 100644 --- a/server/src/main/java/org/apache/accumulo/server/util/Initialize.java +++ b/server/src/main/java/org/apache/accumulo/server/util/Initialize.java @@ -417,6 +417,8 @@ public class Initialize { zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); } private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java b/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java new file mode 100644 index 0000000..c0cef08 --- /dev/null +++ b/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.watcher; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.log4j.Appender; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.helpers.FileWatchdog; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import com.google.common.net.HostAndPort; + +/** + * Watcher that updates the monitor's log4j port from ZooKeeper in a system property + */ +public class MonitorLog4jWatcher extends FileWatchdog implements Watcher { + private static final Logger log = Logger.getLogger(MonitorLog4jWatcher.class); + + private static final String HOST_PROPERTY_NAME = "org.apache.accumulo.core.host.log"; + private static final String PORT_PROPERTY_NAME = "org.apache.accumulo.core.host.log.port"; + + private final Object lock; + private boolean loggingDisabled = false; + protected String path; + + /** + * @param zkPath + * @param filename + * @param delay + * @param propertyName + */ + public MonitorLog4jWatcher(String instance, String filename, int delay) { + super(filename); + setDelay(delay); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR; + this.lock = new Object(); + } + + @Override + public void run() { + try { + // Initially set the logger if the Monitor's log4j advertisement node exists + if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null) + updateMonitorLog4jLocation(); + log.info("Set watch for Monitor Log4j watcher"); + } catch (Exception e) { + log.error("Unable to set watch for Monitor Log4j watcher on " + path); + } + + super.run(); + } + + @Override + protected void doOnChange() { + // this method gets called in the parent class' constructor + // I'm not sure of a better way to get around this. final helps though. + if (null == lock) { + resetLogger(); + return; + } + + synchronized (lock) { + // We might triggered by file-reloading or from ZK update. + // Either way will result in log-forwarding being restarted + loggingDisabled = false; + log.info("Enabled log-forwarding"); + resetLogger(); + } + } + + private void resetLogger() { + // Force a reset on the logger's configuration + LogManager.resetConfiguration(); + new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository()); + } + + @Override + public void process(WatchedEvent event) { + // We got an update, process the data in the node + updateMonitorLog4jLocation(); + + if (event.getPath() != null) { + try { + ZooReaderWriter.getInstance().exists(event.getPath(), this); + } catch (Exception ex) { + log.error("Unable to reset watch for Monitor Log4j watcher", ex); + } + } + } + + /** + * Read the host and port information for the Monitor's log4j socket and update the system properties so that, on logger refresh, it sees the new information. + */ + protected void updateMonitorLog4jLocation() { + try { + String hostPortString = new String(ZooReaderWriter.getInstance().getData(path, null), Constants.UTF8); + HostAndPort hostAndPort = HostAndPort.fromString(hostPortString); + + System.setProperty(HOST_PROPERTY_NAME, hostAndPort.getHostText()); + System.setProperty(PORT_PROPERTY_NAME, Integer.toString(hostAndPort.getPort())); + + log.info("Changing monitor log4j address to " + hostAndPort.toString()); + + doOnChange(); + } catch (NoNodeException e) { + // Not sure on the synchronization guarantees for Loggers and Appenders + // on configuration reload + synchronized (lock) { + // Don't need to try to re-disable'ing it. + if (loggingDisabled) { + return; + } + + Logger logger = LogManager.getLogger("org.apache.accumulo"); + if (null != logger) { + // TODO ACCUMULO-2343 Create a specific appender for log-forwarding to the monitor + // that can replace the AsyncAppender+SocketAppender. + Appender appender = logger.getAppender("ASYNC"); + if (null != appender) { + log.info("Closing log-forwarding appender"); + appender.close(); + log.info("Removing log-forwarding appender"); + logger.removeAppender(appender); + loggingDisabled = true; + } + } + } + } catch (IllegalArgumentException e) { + log.error("Could not parse host and port information", e); + } catch (Exception e) { + log.error("Error reading zookeeper data for Monitor Log4j watcher", e); + } + } +}