accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [01/16] git commit: ACCUMULO-2334 Remove ACCUMULO_LOG_HOST in favor of pull host and port log-forwarding from zookeeper
Date Tue, 11 Feb 2014 18:41:42 GMT
Updated Branches:
  refs/heads/1.5.1-SNAPSHOT 24c044505 -> fafa42961
  refs/heads/1.6.0-SNAPSHOT 2d42417d7 -> f9a196f10
  refs/heads/master a504b16ad -> 0a0be0f6b


ACCUMULO-2334 Remove ACCUMULO_LOG_HOST in favor of pull host and port log-forwarding from zookeeper

Advertising both the host and port for log4j gives us a couple of benefits.
We can do away with ACCUMULO_LOG_HOST, simplify the code to always
do the same thing (pull from zookeeper), and gain robust failover
if the monitor is moved to a different host or is restarted with a random
port (does not require any other service restart to become aware).

The monitor will now acquire a zoolock before starting, which ensures that
all tservers will perform log-forwarding to the correct monitor (in the case
that there were multiple for some reason). Creates a better hierarchy in ZooKeeper
for all monitor data (HTTP and Log4j advertisement). Update `accumulo init`,
`accumulo info` and ensure that the zookeeper layout can handle an "upgrade"
from 1.5.0.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0351d0d4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0351d0d4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0351d0d4

Branch: refs/heads/1.5.1-SNAPSHOT
Commit: 0351d0d416df51f0b10d91acdc074dced36e40d4
Parents: 24c0445
Author: Josh Elser <elserj@apache.org>
Authored: Mon Feb 10 23:28:31 2014 -0500
Committer: Josh Elser <elserj@apache.org>
Committed: Mon Feb 10 23:43:21 2014 -0500

----------------------------------------------------------------------
 .../1GB/native-standalone/accumulo-env.sh       |   1 -
 conf/examples/1GB/standalone/accumulo-env.sh    |   1 -
 .../2GB/native-standalone/accumulo-env.sh       |   1 -
 conf/examples/2GB/standalone/accumulo-env.sh    |   1 -
 .../3GB/native-standalone/accumulo-env.sh       |   1 -
 conf/examples/3GB/standalone/accumulo-env.sh    |   1 -
 .../512MB/native-standalone/accumulo-env.sh     |   1 -
 conf/examples/512MB/standalone/accumulo-env.sh  |   1 -
 .../org/apache/accumulo/core/Constants.java     |   4 +-
 .../org/apache/accumulo/core/conf/Property.java |   1 +
 .../apache/accumulo/core/util/AddressUtil.java  |   4 +
 .../org/apache/accumulo/server/Accumulo.java    |  76 +----
 .../accumulo/server/monitor/LogService.java     |  81 +++--
 .../apache/accumulo/server/monitor/Monitor.java | 326 +++++++++++++------
 .../org/apache/accumulo/server/util/Info.java   |   2 +-
 .../apache/accumulo/server/util/Initialize.java |   2 +
 .../server/watcher/MonitorLog4jWatcher.java     | 154 +++++++++
 17 files changed, 454 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/1GB/native-standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/native-standalone/accumulo-env.sh b/conf/examples/1GB/native-standalone/accumulo-env.sh
index aa4a1d0..868a665 100755
--- a/conf/examples/1GB/native-standalone/accumulo-env.sh
+++ b/conf/examples/1GB/native-standalone/accumulo-env.sh
@@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/1GB/standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/standalone/accumulo-env.sh b/conf/examples/1GB/standalone/accumulo-env.sh
index 1707f3d..e25b028 100755
--- a/conf/examples/1GB/standalone/accumulo-env.sh
+++ b/conf/examples/1GB/standalone/accumulo-env.sh
@@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/2GB/native-standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/native-standalone/accumulo-env.sh b/conf/examples/2GB/native-standalone/accumulo-env.sh
index ef74ca7..848fb91 100755
--- a/conf/examples/2GB/native-standalone/accumulo-env.sh
+++ b/conf/examples/2GB/native-standalone/accumulo-env.sh
@@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx128m -Xms128m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx256m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/2GB/standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/standalone/accumulo-env.sh b/conf/examples/2GB/standalone/accumulo-env.sh
index 75014c2..74ddee5 100755
--- a/conf/examples/2GB/standalone/accumulo-env.sh
+++ b/conf/examples/2GB/standalone/accumulo-env.sh
@@ -52,7 +52,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx128m -Xms128m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx256m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/3GB/native-standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/native-standalone/accumulo-env.sh b/conf/examples/3GB/native-standalone/accumulo-env.sh
index ae0da11..9fe07e7 100755
--- a/conf/examples/3GB/native-standalone/accumulo-env.sh
+++ b/conf/examples/3GB/native-standalone/accumulo-env.sh
@@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx256m -Xms256m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx1g -Xms256m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/3GB/standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/standalone/accumulo-env.sh b/conf/examples/3GB/standalone/accumulo-env.sh
index 7edd938..7a0992c 100755
--- a/conf/examples/3GB/standalone/accumulo-env.sh
+++ b/conf/examples/3GB/standalone/accumulo-env.sh
@@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx256m -Xms256m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx1g -Xms256m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/512MB/native-standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/native-standalone/accumulo-env.sh b/conf/examples/512MB/native-standalone/accumulo-env.sh
index 749a678..2e15473 100755
--- a/conf/examples/512MB/native-standalone/accumulo-env.sh
+++ b/conf/examples/512MB/native-standalone/accumulo-env.sh
@@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/conf/examples/512MB/standalone/accumulo-env.sh
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/standalone/accumulo-env.sh b/conf/examples/512MB/standalone/accumulo-env.sh
index 9beb059..45d64ef 100755
--- a/conf/examples/512MB/standalone/accumulo-env.sh
+++ b/conf/examples/512MB/standalone/accumulo-env.sh
@@ -51,7 +51,6 @@ test -z "$ACCUMULO_MONITOR_OPTS" && export ACCUMULO_MONITOR_OPTS="${POLICY} -Xmx
 test -z "$ACCUMULO_GC_OPTS"      && export ACCUMULO_GC_OPTS="-Xmx64m -Xms64m"
 test -z "$ACCUMULO_GENERAL_OPTS" && export ACCUMULO_GENERAL_OPTS="-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -Djava.net.preferIPv4Stack=true"
 test -z "$ACCUMULO_OTHER_OPTS"   && export ACCUMULO_OTHER_OPTS="-Xmx128m -Xms64m"
-export ACCUMULO_LOG_HOST=`(grep -v '^#' $ACCUMULO_HOME/conf/monitor | egrep -v '^[[:space:]]*$' ; echo localhost ) 2>/dev/null | head -1`
 # what do when the JVM runs out of heap memory
 export ACCUMULO_KILL_CMD='kill -9 %p'
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 9bb3419..095319e 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -64,7 +64,9 @@ public class Constants {
   public static final String ZGC_LOCK = ZGC + "/lock";
   
   public static final String ZMONITOR = "/monitor";
-  public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port";
+  public static final String ZMONITOR_LOCK = ZMONITOR + "/lock";
+  public static final String ZMONITOR_HTTP_ADDR = ZMONITOR + "/http_addr";
+  public static final String ZMONITOR_LOG4J_ADDR = ZMONITOR + "/log4j_addr";
   
   public static final String ZCONFIG = "/config";
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 424b737..1c2dfdb 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -232,6 +232,7 @@ public enum Property {
   MONITOR_SSL_KEYSTOREPASS("monitor.ssl.keyStorePassword", "", PropertyType.STRING, "The keystore password for enabling monitor SSL.", true, false),
   MONITOR_SSL_TRUSTSTORE("monitor.ssl.trustStore", "", PropertyType.PATH, "The truststore for enabling monitor SSL.", true, false),
   MONITOR_SSL_TRUSTSTOREPASS("monitor.ssl.trustStorePassword", "", PropertyType.STRING, "The truststore password for enabling monitor SSL.", true, false),
+  MONITOR_LOCK_CHECK_INTERVAL("monitor.lock.check.interval", "5s", PropertyType.TIMEDURATION, "The amount of time to sleep between checking for the Montior ZooKeeper lock"),
   
   TRACE_PREFIX("trace.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of distributed tracing."),
   TRACE_PORT("trace.port.client", "12234", PropertyType.PORT, "The listening port for the trace server"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
index af9a1a6..5fcf618 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
@@ -44,6 +44,10 @@ public class AddressUtil extends org.apache.accumulo.fate.util.AddressUtil {
     return new TSocket(addr.getHostName(), addr.getPort());
   }
   
+  static public String getHostAddress(InetSocketAddress addr) {
+    return addr.getAddress().getHostAddress();
+  }
+
   static public String toString(InetSocketAddress addr) {
     return addr.getAddress().getHostAddress() + ":" + addr.getPort();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index f4da33b..99ec7e4 100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -32,23 +32,18 @@ import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.Version;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.watcher.MonitorLog4jWatcher;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.log4j.helpers.FileWatchdog;
 import org.apache.log4j.helpers.LogLog;
-import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
 public class Accumulo {
   
@@ -87,58 +82,7 @@ public class Accumulo {
       log.error("creating remote sink for trace spans", ex);
     }
   }
-  
-  private static class LogMonitor extends FileWatchdog implements Watcher {
-    String path;
-    
-    protected LogMonitor(String instance, String filename, int delay) {
-      super(filename);
-      setDelay(delay);
-      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
-    }
-    
-    private void setMonitorPort() {
-      try {
-        String port = new String(ZooReaderWriter.getInstance().getData(path, null), Constants.UTF8);
-        System.setProperty("org.apache.accumulo.core.host.log.port", port);
-        log.info("Changing monitor log4j port to "+port);
-        doOnChange();
-      } catch (Exception e) {
-        log.error("Error reading zookeeper data for monitor log4j port", e);
-      }
-    }
-    
-    @Override
-    public void run() {
-      try {
-        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
-          setMonitorPort();
-        log.info("Set watch for monitor log4j port");
-      } catch (Exception e) {
-        log.error("Unable to set watch for monitor log4j port " + path);
-      }
-      super.run();
-    }
-    
-    @Override
-    protected void doOnChange() {
-      LogManager.resetConfiguration();
-      new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
-    }
-    
-    @Override
-    public void process(WatchedEvent event) {
-      setMonitorPort();
-      if (event.getPath() != null) {
-        try {
-          ZooReaderWriter.getInstance().exists(event.getPath(), this);
-        } catch (Exception ex) {
-          log.error("Unable to reset watch for monitor log4j port", ex);
-        }
-      }
-    }
-  }
-  
+
   public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
     
     System.setProperty("org.apache.accumulo.core.application", application);
@@ -151,11 +95,6 @@ public class Accumulo {
     String localhost = InetAddress.getLocalHost().getHostName();
     System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
     
-    if (System.getenv("ACCUMULO_LOG_HOST") != null)
-      System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
-    else
-      System.setProperty("org.apache.accumulo.core.host.log", localhost);
-    
     int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
     System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
     
@@ -167,13 +106,10 @@ public class Accumulo {
     }
     // Turn off messages about not being able to reach the remote logger... we protect against that.
     LogLog.setQuietMode(true);
-    
-    // Configure logging
-    if (logPort==0)
-      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
-    else
-      DOMConfigurator.configureAndWatch(logConfig, 5000);
-    
+
+    // Configure logging using information advertised in zookeeper by the monitor
+    new MonitorLog4jWatcher(config.getInstance().getInstanceID(), logConfig, 5000).start();
+
     log.info(application + " starting");
     log.info("Instance " + config.getInstance().getInstanceID());
     int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index 10ef9e4..34e88b6 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -29,28 +29,28 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.net.SocketNode;
 import org.apache.log4j.spi.LoggingEvent;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Hijack log4j and capture log events for display.
  * 
  */
 public class LogService extends org.apache.log4j.AppenderSkeleton {
-  
+
   private static final Logger log = Logger.getLogger(LogService.class);
-  
+
   /**
    * Read logging events forward to us over the net.
    * 
    */
   static class SocketServer implements Runnable {
     private ServerSocket server = null;
-    
+
     public SocketServer(int port) {
       try {
         server = new ServerSocket(port);
@@ -58,11 +58,11 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
         throw new RuntimeException(io);
       }
     }
-    
+
     public int getLocalPort() {
       return server.getLocalPort();
     }
-    
+
     public void run() {
       try {
         while (true) {
@@ -77,53 +77,82 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
       }
     }
   }
-  
-  static void startLogListener(AccumuloConfiguration conf, String instanceId) {
+
+  /**
+   * Place the host:port advertisement for the Monitor's Log4j listener in ZooKeeper
+   * 
+   * @param conf
+   *          configuration for the instance
+   * @param instanceId
+   *          instanceId for the instance
+   * @param hostAddress
+   *          Address that monitor process is bound to
+   */
+  static void startLogListener(AccumuloConfiguration conf, String instanceId, String hostAddress) {
     try {
       SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
-      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
-          Integer.toString(server.getLocalPort()).getBytes(Constants.UTF8), NodeExistsPolicy.OVERWRITE);
+
+      // getLocalPort will return the actual ephemeral port used when '0' was provided.
+      String logForwardingAddr = hostAddress + ":" + server.getLocalPort();
+
+      log.debug("Setting monitor log4j log-forwarding address to: " + logForwardingAddr);
+
+      final String path = ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_ADDR;
+      final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+
+      // Delete before we try to re-create in case the previous session hasn't yet expired
+      try {
+        zoo.delete(path, -1);
+      } catch (KeeperException e) {
+        // We don't care if the node is already gone
+        if (!KeeperException.Code.NONODE.equals(e.code())) {
+          throw e;
+        }
+      }
+
+      zoo.putEphemeralData(path, logForwardingAddr.getBytes(Constants.UTF8));
+
       new Daemon(server).start();
     } catch (Throwable t) {
-      log.info("Unable to listen to cluster-wide ports", t);
+      log.info("Unable to start/advertise Log4j listener for log-forwarding to monitor", t);
     }
   }
-  
+
   static private LogService instance = null;
-  
+
   synchronized public static LogService getInstance() {
     if (instance == null)
       return new LogService();
     return instance;
   }
-  
+
   private static final int MAX_LOGS = 50;
-  
+
   private LinkedHashMap<String,DedupedLogEvent> events = new LinkedHashMap<String,DedupedLogEvent>(MAX_LOGS + 1, (float) .75, true) {
-    
+
     private static final long serialVersionUID = 1L;
-    
+
     @Override
     @SuppressWarnings("rawtypes")
     protected boolean removeEldestEntry(Map.Entry eldest) {
       return size() > MAX_LOGS;
     }
   };
-  
+
   public LogService() {
     synchronized (LogService.class) {
       instance = this;
     }
   }
-  
+
   @Override
   synchronized protected void append(LoggingEvent ev) {
     Object application = ev.getMDC("application");
     if (application == null || application.toString().isEmpty())
       return;
-    
+
     DedupedLogEvent dev = new DedupedLogEvent(ev);
-    
+
     // if event is present, increase the count
     if (events.containsKey(dev.toString())) {
       DedupedLogEvent oldDev = events.remove(dev.toString());
@@ -131,26 +160,26 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
     }
     events.put(dev.toString(), dev);
   }
-  
+
   @Override
   public void close() {
     events = null;
   }
-  
+
   @Override
   public synchronized void doAppend(LoggingEvent event) {
     super.doAppend(event);
   }
-  
+
   @Override
   public boolean requiresLayout() {
     return false;
   }
-  
+
   synchronized public List<DedupedLogEvent> getEvents() {
     return new ArrayList<DedupedLogEvent>(events.values());
   }
-  
+
   synchronized public void clear() {
     events.clear();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
index dcb80fd..b33d9e3 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
@@ -51,7 +51,9 @@ import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -74,17 +76,20 @@ import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.EmbeddedWebServer;
+import org.apache.accumulo.server.util.Halt;
+import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Serve master statistics with an embedded web server.
  */
 public class Monitor {
   private static final Logger log = Logger.getLogger(Monitor.class);
-  
+
   public static final int REFRESH_TIME = 5;
   private static long lastRecalc = 0L;
   private static double totalIngestRate = 0.0;
@@ -98,28 +103,28 @@ public class Monitor {
   private static long totalHoldTime = 0;
   private static long totalLookups = 0;
   private static int totalTables = 0;
-  
+
   private static class MaxList<T> extends LinkedList<Pair<Long,T>> {
     private static final long serialVersionUID = 1L;
-    
+
     private long maxDelta;
-    
+
     public MaxList(long maxDelta) {
       this.maxDelta = maxDelta;
     }
-    
+
     @Override
     public boolean add(Pair<Long,T> obj) {
       boolean result = super.add(obj);
-      
+
       if (obj.getFirst() - get(0).getFirst() > maxDelta)
         remove(0);
-      
+
       return result;
     }
-    
+
   }
-  
+
   private static final int MAX_TIME_PERIOD = 60 * 60 * 1000;
   private static final List<Pair<Long,Double>> loadOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
   private static final List<Pair<Long,Double>> ingestRateOverTime = Collections.synchronizedList(new MaxList<Double>(MAX_TIME_PERIOD));
@@ -138,19 +143,21 @@ public class Monitor {
   private static EventCounter indexCacheRequestTracker = new EventCounter();
   private static EventCounter dataCacheHitTracker = new EventCounter();
   private static EventCounter dataCacheRequestTracker = new EventCounter();
-  
+
   private static volatile boolean fetching = false;
   private static MasterMonitorInfo mmi;
   private static Map<String,Map<ProblemType,Integer>> problemSummary = Collections.emptyMap();
   private static Exception problemException;
   private static GCStatus gcStatus;
-  
+
   private static Instance instance;
-  
+
   private static ServerConfiguration config;
-  
+
   private static EmbeddedWebServer server;
-  
+
+  private ZooLock monitorLock;
+
   public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
     Map<String,Double> compactingByTable = new HashMap<String,Double>();
     if (mmi != null && mmi.tServerInfo != null) {
@@ -165,7 +172,7 @@ public class Monitor {
     }
     return compactingByTable;
   }
-  
+
   public static void add(TableInfo total, TableInfo more) {
     if (total.minors == null)
       total.minors = new Compacting();
@@ -195,7 +202,7 @@ public class Monitor {
     total.queryByteRate += more.queryByteRate;
     total.scanRate += more.scanRate;
   }
-  
+
   public static TableInfo summarizeTableStats(TabletServerStatus status) {
     TableInfo summary = new TableInfo();
     summary.majors = new Compacting();
@@ -206,21 +213,21 @@ public class Monitor {
     }
     return summary;
   }
-  
+
   private static class EventCounter {
-    
+
     Map<String,Pair<Long,Long>> prevSamples = new HashMap<String,Pair<Long,Long>>();
     Map<String,Pair<Long,Long>> samples = new HashMap<String,Pair<Long,Long>>();
     Set<String> serversUpdated = new HashSet<String>();
-    
+
     void startingUpdates() {
       serversUpdated.clear();
     }
-    
+
     void updateTabletServer(String name, long sampleTime, long numEvents) {
       Pair<Long,Long> newSample = new Pair<Long,Long>(sampleTime, numEvents);
       Pair<Long,Long> lastSample = samples.get(name);
-      
+
       if (lastSample == null || !lastSample.equals(newSample)) {
         samples.put(name, newSample);
         if (lastSample != null) {
@@ -229,40 +236,40 @@ public class Monitor {
       }
       serversUpdated.add(name);
     }
-    
+
     void finishedUpdating() {
       // remove any tablet servers not updated
       samples.keySet().retainAll(serversUpdated);
       prevSamples.keySet().retainAll(serversUpdated);
     }
-    
+
     double calculateRate() {
       double totalRate = 0;
-      
+
       for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
         Pair<Long,Long> prevSample = entry.getValue();
         Pair<Long,Long> sample = samples.get(entry.getKey());
-        
+
         totalRate += (sample.getSecond() - prevSample.getSecond()) / ((sample.getFirst() - prevSample.getFirst()) / (double) 1000);
       }
-      
+
       return totalRate;
     }
-    
+
     long calculateCount() {
       long count = 0;
-      
+
       for (Entry<String,Pair<Long,Long>> entry : prevSamples.entrySet()) {
         Pair<Long,Long> prevSample = entry.getValue();
         Pair<Long,Long> sample = samples.get(entry.getKey());
-        
+
         count += sample.getSecond() - prevSample.getSecond();
       }
-      
+
       return count;
     }
   }
-  
+
   public static void fetchData() {
     double totalIngestRate = 0.;
     double totalIngestByteRate = 0.;
@@ -275,18 +282,18 @@ public class Monitor {
     long totalHoldTime = 0;
     long totalLookups = 0;
     boolean retry = true;
-    
+
     // only recalc every so often
     long currentTime = System.currentTimeMillis();
     if (currentTime - lastRecalc < REFRESH_TIME * 1000)
       return;
-    
+
     synchronized (Monitor.class) {
       if (fetching)
         return;
       fetching = true;
     }
-    
+
     try {
       while (retry) {
         MasterClientService.Iface client = null;
@@ -313,13 +320,13 @@ public class Monitor {
       if (mmi != null) {
         int majorCompactions = 0;
         int minorCompactions = 0;
-        
+
         lookupRateTracker.startingUpdates();
         indexCacheHitTracker.startingUpdates();
         indexCacheRequestTracker.startingUpdates();
         dataCacheHitTracker.startingUpdates();
         dataCacheRequestTracker.startingUpdates();
-        
+
         for (TabletServerStatus server : mmi.tServerInfo) {
           TableInfo summary = Monitor.summarizeTableStats(server);
           totalIngestRate += summary.ingestRate;
@@ -338,13 +345,13 @@ public class Monitor {
           dataCacheHitTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheHits);
           dataCacheRequestTracker.updateTabletServer(server.name, server.lastContact, server.dataCacheRequest);
         }
-        
+
         lookupRateTracker.finishedUpdating();
         indexCacheHitTracker.finishedUpdating();
         indexCacheRequestTracker.finishedUpdating();
         dataCacheHitTracker.finishedUpdating();
         dataCacheRequestTracker.finishedUpdating();
-        
+
         int totalTables = 0;
         for (TableInfo tInfo : mmi.tableMap.values()) {
           totalTabletCount += tInfo.tablets;
@@ -364,27 +371,27 @@ public class Monitor {
         Monitor.onlineTabletCount = onlineTabletCount;
         Monitor.totalHoldTime = totalHoldTime;
         Monitor.totalLookups = totalLookups;
-        
+
         ingestRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestRate));
         ingestByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalIngestByteRate));
-        
+
         double totalLoad = 0.;
         for (TabletServerStatus status : mmi.tServerInfo) {
           if (status != null)
             totalLoad += status.osLoad;
         }
         loadOverTime.add(new Pair<Long,Double>(currentTime, totalLoad));
-        
+
         minorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, minorCompactions));
         majorCompactionsOverTime.add(new Pair<Long,Integer>(currentTime, majorCompactions));
-        
+
         lookupsOverTime.add(new Pair<Long,Double>(currentTime, lookupRateTracker.calculateRate()));
-        
+
         queryRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalQueryRate));
         queryByteRateOverTime.add(new Pair<Long,Double>(currentTime, totalQueryByteRate));
-        
+
         scanRateOverTime.add(new Pair<Long,Integer>(currentTime, (int) totalScanRate));
-        
+
         calcCacheHitRate(indexCacheHitRateOverTime, currentTime, indexCacheHitTracker, indexCacheRequestTracker);
         calcCacheHitRate(dataCacheHitRateOverTime, currentTime, dataCacheHitTracker, dataCacheRequestTracker);
       }
@@ -396,7 +403,7 @@ public class Monitor {
         Monitor.problemSummary = Collections.emptyMap();
         Monitor.problemException = e;
       }
-      
+
     } finally {
       synchronized (Monitor.class) {
         fetching = false;
@@ -404,7 +411,7 @@ public class Monitor {
       }
     }
   }
-  
+
   private static void calcCacheHitRate(List<Pair<Long,Double>> hitRate, long currentTime, EventCounter cacheHits, EventCounter cacheReq) {
     long req = cacheReq.calculateCount();
     if (req > 0)
@@ -412,7 +419,7 @@ public class Monitor {
     else
       hitRate.add(new Pair<Long,Double>(currentTime, null));
   }
-  
+
   private static GCStatus fetchGcStatus() {
     GCStatus result = null;
     InetSocketAddress address = null;
@@ -437,10 +444,10 @@ public class Monitor {
     }
     return result;
   }
-  
+
   public static void main(String[] args) throws Exception {
     SecurityUtil.serverLogin();
-    
+
     FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
     String hostname = Accumulo.getLocalAddress(args);
     instance = HdfsZooInstance.getInstance();
@@ -450,10 +457,17 @@ public class Monitor {
     Accumulo.enableTracing(hostname, "monitor");
     monitor.run(hostname);
   }
-  
+
   private static long START_TIME;
-  
+
   public void run(String hostname) {
+    try {
+      getMonitorLock();
+    } catch (Exception e) {
+      log.error("Failed to get Monitor ZooKeeper lock");
+      throw new RuntimeException(e);
+    }
+
     Monitor.START_TIME = System.currentTimeMillis();
     int port = config.getConfiguration().getPort(Property.MONITOR_PORT);
     try {
@@ -463,7 +477,7 @@ public class Monitor {
       log.error("Unable to start embedded web server", ex);
       throw new RuntimeException(ex);
     }
-    
+
     server.addServlet(DefaultServlet.class, "/");
     server.addServlet(OperationServlet.class, "/op");
     server.addServlet(MasterServlet.class, "/master");
@@ -481,27 +495,34 @@ public class Monitor {
     if (server.isUsingSsl())
       server.addServlet(ShellServlet.class, "/shell");
     server.start();
-    
-    
+
+    InetSocketAddress monitorAddress = null;
     try {
       hostname = InetAddress.getLocalHost().getHostName();
-      
+
       log.debug("Using " + hostname + " to advertise monitor location in ZooKeeper");
-      
-      String monitorAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, server.getPort()));
-      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, monitorAddress.getBytes(Constants.UTF8),
+
+      monitorAddress = new InetSocketAddress(hostname, server.getPort());
+      String monitorAddressAndPort = org.apache.accumulo.core.util.AddressUtil.toString(monitorAddress);
+      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR, monitorAddressAndPort.getBytes(Constants.UTF8),
           NodeExistsPolicy.OVERWRITE);
       log.info("Set monitor address in zookeeper to " + monitorAddress);
     } catch (Exception ex) {
-      log.error("Unable to set monitor address in zookeeper");
+      log.error("Unable to set monitor HTTP address in zookeeper", ex);
+    }
+
+    if (null != monitorAddress) {
+      LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID(),
+          org.apache.accumulo.core.util.AddressUtil.getHostAddress(monitorAddress));
+    } else {
+      log.warn("Not starting log4j listener as we could not determine address to use");
     }
-    LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID());
-    
+
     new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start();
-    
+
     // need to regularly fetch data so plot data is updated
     new Daemon(new LoggingRunnable(log, new Runnable() {
-      
+
       @Override
       public void run() {
         while (true) {
@@ -510,162 +531,271 @@ public class Monitor {
           } catch (Exception e) {
             log.warn(e.getMessage(), e);
           }
-          
+
           UtilWaitThread.sleep(333);
         }
-        
+
       }
     }), "Data fetcher").start();
   }
-  
+
+  /**
+   * Get the monitor lock in ZooKeeper
+   * 
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private void getMonitorLock() throws KeeperException, InterruptedException {
+    final String zRoot = ZooUtil.getRoot(instance);
+    final String monitorPath = zRoot + Constants.ZMONITOR;
+    final String monitorLockPath = zRoot + Constants.ZMONITOR_LOCK;
+
+    // Ensure that everything is kosher with ZK as this has changed.
+    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    if (zoo.exists(monitorPath)) {
+      byte[] data = zoo.getData(monitorPath, null);
+      // If the node isn't empty, it's from a previous install (has hostname:port for HTTP server)
+      if (0 != data.length) {
+        // Recursively delete from that parent node
+        zoo.recursiveDelete(monitorPath, NodeMissingPolicy.SKIP);
+
+        // And then make the nodes that we expect for the incoming ephemeral nodes
+        zoo.putPersistentData(monitorPath, new byte[0], NodeExistsPolicy.FAIL);
+        zoo.putPersistentData(monitorLockPath, new byte[0], NodeExistsPolicy.FAIL);
+      }
+    } else {
+      // 1.5.0 and earlier
+      zoo.putPersistentData(zRoot + Constants.ZMONITOR, new byte[0], NodeExistsPolicy.FAIL);
+      zoo.putPersistentData(zRoot + Constants.ZMONITOR_LOCK, new byte[0], NodeExistsPolicy.FAIL);
+    }
+
+    // Get a ZooLock for the monitor
+    while (true) {
+      MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher();
+      monitorLock = new ZooLock(monitorLockPath);
+      monitorLock.lockAsync(monitorLockWatcher, new byte[0]);
+
+      monitorLockWatcher.waitForChange();
+
+      if (monitorLockWatcher.acquiredLock) {
+        break;
+      }
+
+      if (!monitorLockWatcher.failedToAcquireLock) {
+        throw new IllegalStateException("monitor lock in unknown state");
+      }
+
+      monitorLock.tryToCancelAsyncLockOrUnlock();
+
+      UtilWaitThread.sleep(instance.getConfiguration().getTimeInMillis(Property.MONITOR_LOCK_CHECK_INTERVAL));
+    }
+
+    log.info("Got Monitor lock.");
+  }
+
+  /**
+   * Async Watcher for monitor lock
+   */
+  private static class MoniterLockWatcher implements ZooLock.AsyncLockWatcher {
+
+    boolean acquiredLock = false;
+    boolean failedToAcquireLock = false;
+
+    @Override
+    public void lostLock(LockLossReason reason) {
+      Halt.halt("Monitor lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
+    }
+
+    @Override
+    public void unableToMonitorLockNode(final Throwable e) {
+      Halt.halt(-1, new Runnable() {
+        @Override
+        public void run() {
+          log.fatal("No longer able to monitor Monitor lock node", e);
+        }
+      });
+
+    }
+
+    @Override
+    public synchronized void acquiredLock() {
+      if (acquiredLock || failedToAcquireLock) {
+        Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
+      }
+
+      acquiredLock = true;
+      notifyAll();
+    }
+
+    @Override
+    public synchronized void failedToAcquireLock(Exception e) {
+      log.warn("Failed to get monitor lock " + e);
+
+      if (acquiredLock) {
+        Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock, -1);
+      }
+
+      failedToAcquireLock = true;
+      notifyAll();
+    }
+
+    public synchronized void waitForChange() {
+      while (!acquiredLock && !failedToAcquireLock) {
+        try {
+          wait();
+        } catch (InterruptedException e) {}
+      }
+    }
+  }
+
   public static MasterMonitorInfo getMmi() {
     return mmi;
   }
-  
+
   public static int getTotalTables() {
     return totalTables;
   }
-  
+
   public static int getTotalTabletCount() {
     return totalTabletCount;
   }
-  
+
   public static int getOnlineTabletCount() {
     return onlineTabletCount;
   }
-  
+
   public static long getTotalEntries() {
     return totalEntries;
   }
-  
+
   public static double getTotalIngestRate() {
     return totalIngestRate;
   }
-  
+
   public static double getTotalIngestByteRate() {
     return totalIngestByteRate;
   }
-  
+
   public static double getTotalQueryRate() {
     return totalQueryRate;
   }
-  
+
   public static double getTotalScanRate() {
     return totalScanRate;
   }
-  
+
   public static double getTotalQueryByteRate() {
     return totalQueryByteRate;
   }
-  
+
   public static long getTotalHoldTime() {
     return totalHoldTime;
   }
-  
+
   public static Exception getProblemException() {
     return problemException;
   }
-  
+
   public static Map<String,Map<ProblemType,Integer>> getProblemSummary() {
     return problemSummary;
   }
-  
+
   public static GCStatus getGcStatus() {
     return gcStatus;
   }
-  
+
   public static long getTotalLookups() {
     return totalLookups;
   }
-  
+
   public static long getStartTime() {
     return START_TIME;
   }
-  
+
   public static List<Pair<Long,Double>> getLoadOverTime() {
     synchronized (loadOverTime) {
       return new ArrayList<Pair<Long,Double>>(loadOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getIngestRateOverTime() {
     synchronized (ingestRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(ingestRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getIngestByteRateOverTime() {
     synchronized (ingestByteRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(ingestByteRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Integer>> getRecoveriesOverTime() {
     synchronized (recoveriesOverTime) {
       return new ArrayList<Pair<Long,Integer>>(recoveriesOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Integer>> getMinorCompactionsOverTime() {
     synchronized (minorCompactionsOverTime) {
       return new ArrayList<Pair<Long,Integer>>(minorCompactionsOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Integer>> getMajorCompactionsOverTime() {
     synchronized (majorCompactionsOverTime) {
       return new ArrayList<Pair<Long,Integer>>(majorCompactionsOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getLookupsOverTime() {
     synchronized (lookupsOverTime) {
       return new ArrayList<Pair<Long,Double>>(lookupsOverTime);
     }
   }
-  
+
   public static double getLookupRate() {
     return lookupRateTracker.calculateRate();
   }
-  
+
   public static List<Pair<Long,Integer>> getQueryRateOverTime() {
     synchronized (queryRateOverTime) {
       return new ArrayList<Pair<Long,Integer>>(queryRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Integer>> getScanRateOverTime() {
     synchronized (scanRateOverTime) {
       return new ArrayList<Pair<Long,Integer>>(scanRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getQueryByteRateOverTime() {
     synchronized (queryByteRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(queryByteRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getIndexCacheHitRateOverTime() {
     synchronized (indexCacheHitRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(indexCacheHitRateOverTime);
     }
   }
-  
+
   public static List<Pair<Long,Double>> getDataCacheHitRateOverTime() {
     synchronized (dataCacheHitRateOverTime) {
       return new ArrayList<Pair<Long,Double>>(dataCacheHitRateOverTime);
     }
   }
-  
+
   public static AccumuloConfiguration getSystemConfiguration() {
     return config.getConfiguration();
   }
-  
+
   public static Instance getInstance() {
     return instance;
   }
-  
+
   public static boolean isUsingSsl() {
     return server.isUsingSsl();
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/util/Info.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java b/server/src/main/java/org/apache/accumulo/server/util/Info.java
index 7a07107..c819ac1 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/Info.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java
@@ -26,7 +26,7 @@ public class Info {
   public static void main(String[] args) throws Exception {
     ZooReaderWriter zrw = ZooReaderWriter.getInstance();
     Instance instance = HdfsZooInstance.getInstance();
-    System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR, null), Constants.UTF8));
+    System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR_HTTP_ADDR, null), Constants.UTF8));
     System.out.println("masters: " + instance.getMasterLocations());
     System.out.println("zookeepers: " + instance.getZooKeepers());
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Initialize.java b/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
index baa5400..ce26ff5 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
@@ -417,6 +417,8 @@ public class Initialize {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZHDFS_RESERVATIONS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZNEXT_FILE, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + Constants.ZRECOVERY, ZERO_CHAR_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
   }
 
   private static String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0351d0d4/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java b/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java
new file mode 100644
index 0000000..c0cef08
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/watcher/MonitorLog4jWatcher.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.watcher;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.log4j.Appender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
+import org.apache.log4j.xml.DOMConfigurator;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Watcher that updates the monitor's log4j port from ZooKeeper in a system property
+ */
+public class MonitorLog4jWatcher extends FileWatchdog implements Watcher {
+  private static final Logger log = Logger.getLogger(MonitorLog4jWatcher.class);
+
+  private static final String HOST_PROPERTY_NAME = "org.apache.accumulo.core.host.log";
+  private static final String PORT_PROPERTY_NAME = "org.apache.accumulo.core.host.log.port";
+
+  private final Object lock;
+  private boolean loggingDisabled = false;
+  protected String path;
+
+  /**
+   * @param zkPath
+   * @param filename
+   * @param delay
+   * @param propertyName
+   */
+  public MonitorLog4jWatcher(String instance, String filename, int delay) {
+    super(filename);
+    setDelay(delay);
+    this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR;
+    this.lock = new Object();
+  }
+
+  @Override
+  public void run() {
+    try {
+      // Initially set the logger if the Monitor's log4j advertisement node exists
+      if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+        updateMonitorLog4jLocation();
+      log.info("Set watch for Monitor Log4j watcher");
+    } catch (Exception e) {
+      log.error("Unable to set watch for Monitor Log4j watcher on " + path);
+    }
+
+    super.run();
+  }
+
+  @Override
+  protected void doOnChange() {
+    // this method gets called in the parent class' constructor
+    // I'm not sure of a better way to get around this. final helps though.
+    if (null == lock) {
+      resetLogger();
+      return;
+    }
+    
+    synchronized (lock) {
+      // We might triggered by file-reloading or from ZK update.
+      // Either way will result in log-forwarding being restarted
+      loggingDisabled = false;
+      log.info("Enabled log-forwarding");
+      resetLogger();
+    }
+  }
+  
+  private void resetLogger() {
+    // Force a reset on the logger's configuration
+    LogManager.resetConfiguration();
+    new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+  }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // We got an update, process the data in the node
+    updateMonitorLog4jLocation();
+
+    if (event.getPath() != null) {
+      try {
+        ZooReaderWriter.getInstance().exists(event.getPath(), this);
+      } catch (Exception ex) {
+        log.error("Unable to reset watch for Monitor Log4j watcher", ex);
+      }
+    }
+  }
+
+  /**
+   * Read the host and port information for the Monitor's log4j socket and update the system properties so that, on logger refresh, it sees the new information.
+   */
+  protected void updateMonitorLog4jLocation() {
+    try {
+      String hostPortString = new String(ZooReaderWriter.getInstance().getData(path, null), Constants.UTF8);
+      HostAndPort hostAndPort = HostAndPort.fromString(hostPortString);
+
+      System.setProperty(HOST_PROPERTY_NAME, hostAndPort.getHostText());
+      System.setProperty(PORT_PROPERTY_NAME, Integer.toString(hostAndPort.getPort()));
+
+      log.info("Changing monitor log4j address to " + hostAndPort.toString());
+
+      doOnChange();
+    } catch (NoNodeException e) {
+      // Not sure on the synchronization guarantees for Loggers and Appenders
+      // on configuration reload
+      synchronized (lock) {
+        // Don't need to try to re-disable'ing it.
+        if (loggingDisabled) {
+          return;
+        }
+
+        Logger logger = LogManager.getLogger("org.apache.accumulo");
+        if (null != logger) {
+          // TODO ACCUMULO-2343 Create a specific appender for log-forwarding to the monitor
+          // that can replace the AsyncAppender+SocketAppender.
+          Appender appender = logger.getAppender("ASYNC");
+          if (null != appender) {
+            log.info("Closing log-forwarding appender");
+            appender.close();
+            log.info("Removing log-forwarding appender");
+            logger.removeAppender(appender);
+            loggingDisabled = true;
+          }
+        }
+      }
+    } catch (IllegalArgumentException e) {
+      log.error("Could not parse host and port information", e);
+    } catch (Exception e) {
+      log.error("Error reading zookeeper data for Monitor Log4j watcher", e);
+    }
+  }
+}


Mime
View raw message