accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bil...@apache.org
Subject git commit: ACCUMULO-1664 made server processes able to use random ports by configuring the ports as 0
Date Wed, 11 Sep 2013 16:01:39 GMT
Updated Branches:
  refs/heads/master 1edccf6b3 -> 6e7269ddf


ACCUMULO-1664 made server processes able to use random ports by configuring the ports as 0


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6e7269dd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6e7269dd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6e7269dd

Branch: refs/heads/master
Commit: 6e7269ddfffb3e35824f5de87ac93304e39d52cf
Parents: 1edccf6
Author: Billie Rinaldi <billie.rinaldi@gmail.com>
Authored: Wed Sep 11 08:59:44 2013 -0700
Committer: Billie Rinaldi <billie.rinaldi@gmail.com>
Committed: Wed Sep 11 08:59:58 2013 -0700

----------------------------------------------------------------------
 .../1GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/1GB/standalone/generic_logger.xml |   2 +-
 .../2GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/2GB/standalone/generic_logger.xml |   2 +-
 .../3GB/native-standalone/generic_logger.xml    |   2 +-
 conf/examples/3GB/standalone/generic_logger.xml |   2 +-
 .../512MB/native-standalone/generic_logger.xml  |   2 +-
 .../512MB/standalone/generic_logger.xml         |   2 +-
 .../org/apache/accumulo/core/Constants.java     |   3 +
 .../accumulo/core/client/impl/MasterClient.java |   3 +
 .../apache/accumulo/fate/zookeeper/ZooLock.java |   5 +
 .../minicluster/MiniAccumuloConfig.java         |   2 +-
 .../org/apache/accumulo/server/Accumulo.java    |  64 +++++++-
 .../server/gc/SimpleGarbageCollector.java       |   2 +-
 .../apache/accumulo/server/master/Master.java   |  10 +-
 .../accumulo/server/monitor/LogService.java     |  15 +-
 .../apache/accumulo/server/monitor/Monitor.java |  13 +-
 .../org/apache/accumulo/server/util/Info.java   |  33 ++++
 .../server/util/TNonblockingServerSocket.java   | 157 +++++++++++++++++++
 .../accumulo/server/util/TServerUtils.java      |  14 +-
 .../java/org/apache/accumulo/start/Main.java    |   6 +-
 .../accumulo/fate/zookeeper/ZooLockTest.java    |  18 +++
 22 files changed, 338 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/1GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/native-standalone/generic_logger.xml b/conf/examples/1GB/native-standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/1GB/native-standalone/generic_logger.xml
+++ b/conf/examples/1GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/1GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/1GB/standalone/generic_logger.xml b/conf/examples/1GB/standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/1GB/standalone/generic_logger.xml
+++ b/conf/examples/1GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/2GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/native-standalone/generic_logger.xml b/conf/examples/2GB/native-standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/2GB/native-standalone/generic_logger.xml
+++ b/conf/examples/2GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/2GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/2GB/standalone/generic_logger.xml b/conf/examples/2GB/standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/2GB/standalone/generic_logger.xml
+++ b/conf/examples/2GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/3GB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/native-standalone/generic_logger.xml b/conf/examples/3GB/native-standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/3GB/native-standalone/generic_logger.xml
+++ b/conf/examples/3GB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/3GB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/3GB/standalone/generic_logger.xml b/conf/examples/3GB/standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/3GB/standalone/generic_logger.xml
+++ b/conf/examples/3GB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/512MB/native-standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/native-standalone/generic_logger.xml b/conf/examples/512MB/native-standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/512MB/native-standalone/generic_logger.xml
+++ b/conf/examples/512MB/native-standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/conf/examples/512MB/standalone/generic_logger.xml
----------------------------------------------------------------------
diff --git a/conf/examples/512MB/standalone/generic_logger.xml b/conf/examples/512MB/standalone/generic_logger.xml
index 2353135..af5bcab 100644
--- a/conf/examples/512MB/standalone/generic_logger.xml
+++ b/conf/examples/512MB/standalone/generic_logger.xml
@@ -43,7 +43,7 @@
   <!-- Send all logging data to a centralized logger -->
   <appender name="N1" class="org.apache.log4j.net.SocketAppender">
      <param name="remoteHost"     value="${org.apache.accumulo.core.host.log}"/>
-     <param name="port"           value="4560"/>
+     <param name="port"           value="${org.apache.accumulo.core.host.log.port}"/>
      <param name="application"    value="${org.apache.accumulo.core.application}:${org.apache.accumulo.core.ip.localhost.hostname}"/>
      <param name="Threshold"      value="WARN"/>
   </appender>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 7ac665c..ebcc47b 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -44,6 +44,9 @@ public class Constants {
   public static final String ZGC = "/gc";
   public static final String ZGC_LOCK = ZGC + "/lock";
   
+  public static final String ZMONITOR = "/monitor";
+  public static final String ZMONITOR_LOG4J_PORT = ZMONITOR + "/log4j_port";
+  
   public static final String ZCONFIG = "/config";
   
   public static final String ZTSERVERS = "/tservers";

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
index 63ebdf7..32c80f9 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MasterClient.java
@@ -56,6 +56,9 @@ public class MasterClient {
     }
     
     String master = locations.get(0);
+    if (master.endsWith(":0"))
+      return null;
+    
     try {
       // Master requests can take a long time: don't ever time out
       MasterClientService.Client client = ThriftUtil.getClientNoTimeout(new MasterClientService.Client.Factory(),
master);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
----------------------------------------------------------------------
diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
index fb2f3d8..8772a83 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
@@ -334,6 +334,11 @@ public class ZooLock implements Watcher {
     return lock != null;
   }
   
+  public synchronized void replaceLockData(byte[] b) throws KeeperException, InterruptedException
{
+    if (getLockPath()!=null)
+      zooKeeper.getZooKeeper().setData(getLockPath(), b, -1);
+  }
+  
   @Override
   public synchronized void process(WatchedEvent event) {
     log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java
b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java
index ff18a6d..f8ad3af 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java
@@ -142,7 +142,7 @@ public class MiniAccumuloConfig {
    */
   private void mergePropWithRandomPort(String key) {
     if (!siteConfig.containsKey(key)) {
-      siteConfig.put(key, PortUtils.getRandomFreePort() + "");
+      siteConfig.put(key, "0");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index dc90a7b..57c4e2a 100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.trace.DistributedTrace;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.Version;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -37,10 +38,14 @@ import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.FileWatchdog;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.xml.DOMConfigurator;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 
 public class Accumulo {
   
@@ -80,6 +85,57 @@ public class Accumulo {
     }
   }
   
+  private static class LogMonitor extends FileWatchdog implements Watcher {
+    String path;
+    
+    protected LogMonitor(String instance, String filename, int delay) {
+      super(filename);
+      setDelay(delay);
+      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
+    }
+    
+    private void setMonitorPort() {
+      try {
+        String port = new String(ZooReaderWriter.getInstance().getData(path, null));
+        System.setProperty("org.apache.accumulo.core.host.log.port", port);
+        log.info("Changing monitor log4j port to "+port);
+        doOnChange();
+      } catch (Exception e) {
+        log.error("Error reading zookeeper data for monitor log4j port", e);
+      }
+    }
+    
+    @Override
+    public void run() {
+      try {
+        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
+          setMonitorPort();
+        log.info("Set watch for monitor log4j port");
+      } catch (Exception e) {
+        log.error("Unable to set watch for monitor log4j port " + path);
+      }
+      super.run();
+    }
+    
+    @Override
+    protected void doOnChange() {
+      LogManager.resetConfiguration();
+      new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
+    }
+    
+    @Override
+    public void process(WatchedEvent event) {
+      setMonitorPort();
+      if (event.getPath() != null) {
+        try {
+          ZooReaderWriter.getInstance().exists(event.getPath(), this);
+        } catch (Exception ex) {
+          log.error("Unable to reset watch for monitor log4j port", ex);
+        }
+      }
+    }
+  }
+  
   public static void init(VolumeManager fs, ServerConfiguration config, String application)
throws UnknownHostException {
     
     System.setProperty("org.apache.accumulo.core.application", application);
@@ -97,6 +153,9 @@ public class Accumulo {
     else
       System.setProperty("org.apache.accumulo.core.host.log", localhost);
     
+    int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
+    System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
+    
     // Use a specific log config, if it exists
     String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"),
application);
     if (!new File(logConfig).exists()) {
@@ -107,7 +166,10 @@ public class Accumulo {
     LogLog.setQuietMode(true);
     
     // Configure logging
-    DOMConfigurator.configureAndWatch(logConfig, 5000);
+    if (logPort==0)
+      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
+    else
+      DOMConfigurator.configureAndWatch(logConfig, 5000);
     
     // Read the auditing config
     String auditConfig = String.format("%s/conf/auditLog.xml", System.getenv("ACCUMULO_HOME"),
application);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
index 118536f..92100ec 100644
--- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
+++ b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
@@ -406,7 +406,7 @@ public class SimpleGarbageCollector implements Iface {
     long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
     InetSocketAddress result = new InetSocketAddress(opts.getAddress(), port);
     try {
-      TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor
Service", 2, 1000, maxMessageSize);
+      port = TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(),
"GC Monitor Service", 2, 1000, maxMessageSize).address.getPort();
     } catch (Exception ex) {
       log.fatal(ex, ex);
       throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/master/Master.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java
index feb944a..0f6a097 100644
--- a/server/src/main/java/org/apache/accumulo/server/master/Master.java
+++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java
@@ -77,6 +77,7 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -138,6 +139,7 @@ import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.util.SystemPropUtil;
 import org.apache.accumulo.server.util.TServerUtils;
+import org.apache.accumulo.server.util.TServerUtils.ServerAddress;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;
 import org.apache.accumulo.server.util.time.SimpleTimer;
@@ -1511,8 +1513,12 @@ public class Master implements LiveTServerSet.Listener, TableObserver,
CurrentSt
     }
     
     Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler()));
-    clientService = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master",
-        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE).server;
+    ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT,
processor, "Master",
+        "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
+    clientService = sa.server;
+    String address = AddressUtil.toString(sa.address);
+    log.info("Setting master lock data to " + address);
+    masterLock.replaceLockData(address.getBytes());
     
     while (!clientService.isServing()) {
       UtilWaitThread.sleep(100);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
index a123e9f..ce5dab8 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/LogService.java
@@ -24,9 +24,13 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.net.SocketNode;
@@ -55,6 +59,10 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
       }
     }
     
+    public int getLocalPort() {
+      return server.getLocalPort();
+    }
+    
     public void run() {
       try {
         while (true) {
@@ -70,9 +78,12 @@ public class LogService extends org.apache.log4j.AppenderSkeleton {
     }
   }
   
-  static void startLogListener(AccumuloConfiguration conf) {
+  static void startLogListener(AccumuloConfiguration conf, String instanceId) {
     try {
-      new Daemon(new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT))).start();
+      SocketServer server = new SocketServer(conf.getPort(Property.MONITOR_LOG4J_PORT));
+      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instanceId) + Constants.ZMONITOR_LOG4J_PORT,
+          Integer.toString(server.getLocalPort()).getBytes(), NodeExistsPolicy.OVERWRITE);
+      new Daemon(server).start();
     } catch (Throwable t) {
       log.info("Unable to listen to cluster-wide ports", t);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
index c972566..e0a1e9d 100644
--- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
+++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
@@ -48,6 +48,7 @@ import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.ServerOpts;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -73,6 +74,7 @@ import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.util.EmbeddedWebServer;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
@@ -491,9 +493,18 @@ public class Monitor {
     server.addServlet(ShowTrace.class, "/trace/show");
     if (server.isUsingSsl())
       server.addServlet(ShellServlet.class, "/shell");
-    LogService.startLogListener(Monitor.getSystemConfiguration());
     server.start();
     
+    try {
+      String monitorAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname,
server.getPort()));
+      ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMONITOR,
monitorAddress.getBytes(),
+          NodeExistsPolicy.OVERWRITE);
+      log.info("Set monitor address in zookeeper to " + monitorAddress);
+    } catch (Exception ex) {
+      log.error("Unable to set monitor address in zookeeper");
+    }
+    LogService.startLogListener(Monitor.getSystemConfiguration(), instance.getInstanceID());
+    
     new Daemon(new LoggingRunnable(log, new ZooKeeperStatus()), "ZooKeeperStatus").start();
     
     // need to regularly fetch data so plot data is updated

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/util/Info.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Info.java b/server/src/main/java/org/apache/accumulo/server/util/Info.java
new file mode 100644
index 0000000..4f03d82
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/Info.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+
+public class Info {
+  public static void main(String[] args) throws Exception {
+    ZooReaderWriter zrw = ZooReaderWriter.getInstance();
+    Instance instance = HdfsZooInstance.getInstance();
+    System.out.println("monitor: " + new String(zrw.getData(ZooUtil.getRoot(instance) + Constants.ZMONITOR,
null)));
+    System.out.println("masters: " + instance.getMasterLocations());
+    System.out.println("zookeepers: " + instance.getZooKeepers());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
new file mode 100644
index 0000000..4154d9d
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/util/TNonblockingServerSocket.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.server.util;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Wrapper around ServerSocketChannel.
+ * 
+ * This class is copied from org.apache.thrift.transport.TNonblockingServerSocket version
0.9.
+ * The only change (apart from the logging statements) is the addition of the {@link #getPort()}
method to retrieve the port used by the ServerSocket.
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+  private static final Logger log = Logger.getLogger(TNonblockingServerTransport.class.getName());
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying ServerSocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException
{
+    this(new InetSocketAddress(port), clientTimeout);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException
{
+    this(bindAddr, 0);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException
{
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(bindAddr);
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString()
+ ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        log.warn("WARNING: Could not close server socket: " + iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+  public int getPort() {
+    return serverSocket_.getLocalPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
index 0c56476..fe49cfd 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java
@@ -45,7 +45,6 @@ import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
@@ -114,7 +113,7 @@ public class TServerUtils {
       
       for (int i = 0; i < portsToSearch; i++) {
         int port = portHint + i;
-        if (portHint == 0)
+        if (portHint != 0 && i > 0)
           port = 1024 + random.nextInt(65535 - 1024);
         if (port > 65535)
           port = 1024 + port % (65535 - 1024);
@@ -218,14 +217,19 @@ public class TServerUtils {
       long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
     TNonblockingServerSocket transport = new TNonblockingServerSocket(address);
     // check for the special "bind to everything address"
-    if (address.getAddress().getHostAddress().equals("0.0.0.0")) {
+    String hostname = address.getAddress().getHostAddress();
+    if (hostname.equals("0.0.0.0")) {
       // can't get the address from the bind, so we'll do our best to invent our hostname
       try {
-        address = new InetSocketAddress(InetAddress.getLocalHost().getHostName(), address.getPort());
+        hostname = InetAddress.getLocalHost().getHostName();
       } catch (UnknownHostException e) {
         throw new TTransportException(e);
       }
     }
+    int port = address.getPort();
+    if (port == 0) {
+      port = transport.getPort();
+    }
     THsHaServer.Args options = new THsHaServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
@@ -260,7 +264,7 @@ public class TServerUtils {
     }, timeBetweenThreadChecks, timeBetweenThreadChecks);
     options.executorService(pool);
     options.processorFactory(new TProcessorFactory(processor));
-    return new ServerAddress(new THsHaServer(options), address);
+    return new ServerAddress(new THsHaServer(options), new InetSocketAddress(hostname, port));
   }
   
   public static ServerAddress startThreadPoolServer(InetSocketAddress address, TProcessor
processor, String serverName, String threadName, int numThreads)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/start/src/main/java/org/apache/accumulo/start/Main.java
----------------------------------------------------------------------
diff --git a/start/src/main/java/org/apache/accumulo/start/Main.java b/start/src/main/java/org/apache/accumulo/start/Main.java
index 66a3e71..464326d 100644
--- a/start/src/main/java/org/apache/accumulo/start/Main.java
+++ b/start/src/main/java/org/apache/accumulo/start/Main.java
@@ -74,11 +74,13 @@ public class Main {
       } else if (args[0].equals("rfile-info")) {
         runTMP = cl.loadClass("org.apache.accumulo.core.file.rfile.PrintInfo");
       } else if (args[0].equals("login-info")) {
-        runTMP = cl.loadClass("org.apache.accumulo.core.util.LoginProperties");
+        runTMP = cl.loadClass("org.apache.accumulo.server.util.LoginProperties");
       } else if (args[0].equals("zookeeper")) {
         runTMP = cl.loadClass("org.apache.accumulo.server.util.ZooKeeperMain");
       } else if (args[0].equals("create-token")) {
         runTMP = cl.loadClass("org.apache.accumulo.core.util.CreateToken");
+      } else if (args[0].equals("info")) {
+        runTMP = cl.loadClass("org.apache.accumulo.server.util.Info");
       } else {
         try {
           runTMP = cl.loadClass(args[0]);
@@ -121,6 +123,6 @@ public class Main {
   }
   
   private static void printUsage() {
-    System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc |
classpath | rfile-info | login-info | tracer | minicluster | proxy | zookeeper | create-token
| <accumulo class> args");
+    System.out.println("accumulo init | master | tserver | monitor | shell | admin | gc |
classpath | rfile-info | login-info | tracer | minicluster | proxy | zookeeper | create-token
| info | version <accumulo class> args");
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/6e7269dd/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
index d29ad6d..fb99f42 100644
--- a/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
+++ b/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
@@ -328,6 +328,24 @@ public class ZooLockTest {
     zl.unlock();
   }
   
+  @Test(timeout = 10000)
+  public void testChangeData() throws Exception {
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    ZooKeeper zk = new ZooKeeper(accumulo.getZooKeepers(), 1000, null);
+    zk.addAuthInfo("digest", "secret".getBytes());
+    zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    
+    ZooLock zl = new ZooLock(accumulo.getZooKeepers(), 1000, "digest", "secret".getBytes(),
parent);
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    Assert.assertEquals("test1", new String(zk.getData(zl.getLockPath(), null, null)));
+    
+    zl.replaceLockData("test2".getBytes());
+    Assert.assertEquals("test2", new String(zk.getData(zl.getLockPath(), null, null)));
+  }
+  
   @AfterClass
   public static void tearDownMiniCluster() throws Exception {
     accumulo.stop();


Mime
View raw message