accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [2/6] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Date Fri, 10 Jan 2014 21:37:46 GMT
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ee9035fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ee9035fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ee9035fa

Branch: refs/heads/master
Commit: ee9035fac4ed6eac1ad60c29ed0b6d5bd46248ec
Parents: cb50a74 6d49e1a
Author: Eric Newton <eric.newton@gmail.com>
Authored: Thu Jan 9 15:58:16 2014 -0500
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Thu Jan 9 15:58:16 2014 -0500

----------------------------------------------------------------------
 server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee9035fa/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/Accumulo.java
index f56dfd8,0000000..33bb871
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,309 -1,0 +1,309 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetAddress;
 +import java.net.UnknownHostException;
 +import java.util.Map.Entry;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.trace.DistributedTrace;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.Version;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.log4j.LogManager;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.helpers.FileWatchdog;
 +import org.apache.log4j.helpers.LogLog;
 +import org.apache.log4j.xml.DOMConfigurator;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +
 +public class Accumulo {
 +  
 +  private static final Logger log = Logger.getLogger(Accumulo.class);
 +  
 +  public static synchronized void updateAccumuloVersion(FileSystem fs) {
 +    try {
 +      if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
 +        fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
 +        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION),
false);
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
 +    }
 +  }
 +  
 +  public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
 +    int dataVersion;
 +    try {
 +      FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
 +      if (files == null || files.length == 0) {
 +        dataVersion = -1; // assume it is 0.5 or earlier
 +      } else {
 +        dataVersion = Integer.parseInt(files[0].getPath().getName());
 +      }
 +      return dataVersion;
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to read accumulo version: an error occurred.",
e);
 +    }
 +  }
 +  
 +  public static void enableTracing(String address, String application) {
 +    try {
 +      DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(),
application, address);
 +    } catch (Exception ex) {
 +      log.error("creating remote sink for trace spans", ex);
 +    }
 +  }
 +  
 +  private static class LogMonitor extends FileWatchdog implements Watcher {
 +    String path;
 +    
 +    protected LogMonitor(String instance, String filename, int delay) {
 +      super(filename);
 +      setDelay(delay);
 +      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
 +    }
 +    
 +    private void setMonitorPort() {
 +      try {
 +        String port = new String(ZooReaderWriter.getInstance().getData(path, null));
 +        System.setProperty("org.apache.accumulo.core.host.log.port", port);
 +        log.info("Changing monitor log4j port to "+port);
 +        doOnChange();
 +      } catch (Exception e) {
 +        log.error("Error reading zookeeper data for monitor log4j port", e);
 +      }
 +    }
 +    
 +    @Override
 +    public void run() {
 +      try {
 +        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
 +          setMonitorPort();
 +        log.info("Set watch for monitor log4j port");
 +      } catch (Exception e) {
 +        log.error("Unable to set watch for monitor log4j port " + path);
 +      }
 +      super.run();
 +    }
 +    
 +    @Override
 +    protected void doOnChange() {
 +      LogManager.resetConfiguration();
 +      new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
 +    }
 +    
 +    @Override
 +    public void process(WatchedEvent event) {
 +      setMonitorPort();
 +      if (event.getPath() != null) {
 +        try {
 +          ZooReaderWriter.getInstance().exists(event.getPath(), this);
 +        } catch (Exception ex) {
 +          log.error("Unable to reset watch for monitor log4j port", ex);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public static void init(FileSystem fs, ServerConfiguration config, String application)
throws UnknownHostException {
 +    
 +    System.setProperty("org.apache.accumulo.core.application", application);
 +    
 +    if (System.getenv("ACCUMULO_LOG_DIR") != null)
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME")
+ "/logs/");
 +    
 +    String localhost = InetAddress.getLocalHost().getHostName();
 +    System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
 +    
 +    if (System.getenv("ACCUMULO_LOG_HOST") != null)
 +      System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.host.log", localhost);
 +    
 +    int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
 +    System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
 +    
 +    // Use a specific log config, if it exists
-     String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
++    String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"),
application);
 +    if (!new File(logConfig).exists()) {
 +      // otherwise, use the generic config
 +      logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
 +    }
 +    // Turn off messages about not being able to reach the remote logger... we protect against
that.
 +    LogLog.setQuietMode(true);
 +    
 +    // Configure logging
 +    if (logPort==0)
 +      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
 +    else
 +      DOMConfigurator.configureAndWatch(logConfig, 5000);
 +    
 +    log.info(application + " starting");
 +    log.info("Instance " + config.getInstance().getInstanceID());
 +    int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
 +    log.info("Data Version " + dataVersion);
 +    Accumulo.waitForZookeeperAndHdfs(fs);
 +    
 +    Version codeVersion = new Version(Constants.VERSION);
 +    if (dataVersion != Constants.DATA_VERSION && dataVersion != Constants.PREV_DATA_VERSION)
{
 +      throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not
compatible with files stored using data version " + dataVersion);
 +    }
 +    
 +    TreeMap<String,String> sortedProps = new TreeMap<String,String>();
 +    for (Entry<String,String> entry : config.getConfiguration())
 +      sortedProps.put(entry.getKey(), entry.getValue());
 +    
 +    for (Entry<String,String> entry : sortedProps.entrySet()) {
 +      if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret")
 +          || entry.getKey().startsWith(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey()))
 +        log.info(entry.getKey() + " = <hidden>");
 +      else
 +        log.info(entry.getKey() + " = " + entry.getValue());
 +    }
 +    
 +    monitorSwappiness();
 +  }
 +  
 +  /**
 +   * 
 +   */
 +  public static void monitorSwappiness() {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          String procFile = "/proc/sys/vm/swappiness";
 +          File swappiness = new File(procFile);
 +          if (swappiness.exists() && swappiness.canRead()) {
 +            InputStream is = new FileInputStream(procFile);
 +            try {
 +              byte[] buffer = new byte[10];
 +              int bytes = is.read(buffer);
 +              String setting = new String(buffer, 0, bytes);
 +              setting = setting.trim();
 +              if (bytes > 0 && Integer.parseInt(setting) > 10) {
 +                log.warn("System swappiness setting is greater than ten (" + setting + ")
which can cause time-sensitive operations to be delayed. "
 +                    + " Accumulo is time sensitive because it needs to maintain distributed
lock agreement.");
 +              }
 +            } finally {
 +              is.close();
 +            }
 +          }
 +        } catch (Throwable t) {
 +          log.error(t, t);
 +        }
 +      }
 +    }, 1000, 10 * 60 * 1000);
 +  }
 +  
 +  public static String getLocalAddress(String[] args) throws UnknownHostException {
 +    InetAddress result = InetAddress.getLocalHost();
 +    for (int i = 0; i < args.length - 1; i++) {
 +      if (args[i].equals("-a") || args[i].equals("--address")) {
 +        result = InetAddress.getByName(args[i + 1]);
 +        log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")");
 +        break;
 +      }
 +    }
 +    return result.getHostName();
 +  }
 +  
 +  public static void waitForZookeeperAndHdfs(FileSystem fs) {
 +    log.info("Attempting to talk to zookeeper");
 +    while (true) {
 +      try {
 +        ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
 +        break;
 +      } catch (InterruptedException e) {
 +        // ignored
 +      } catch (KeeperException ex) {
 +        log.info("Waiting for accumulo to be initialized");
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
 +    long sleep = 1000;
 +    while (true) {
 +      try {
 +        if (!isInSafeMode(fs))
 +          break;
 +        log.warn("Waiting for the NameNode to leave safemode");
 +      } catch (IOException ex) {
 +        log.warn("Unable to connect to HDFS");
 +      }
 +      log.info("Sleeping " + sleep / 1000. + " seconds");
 +      UtilWaitThread.sleep(sleep);
 +      sleep = Math.min(60 * 1000, sleep * 2);
 +    }
 +    log.info("Connected to HDFS");
 +  }
 +  
 +  private static boolean isInSafeMode(FileSystem fs) throws IOException {
 +    if (!(fs instanceof DistributedFileSystem))
 +      return false;
 +    DistributedFileSystem dfs = (DistributedFileSystem)fs;
 +    // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
 +    // Becomes this:
 +    Class<?> safeModeAction;
 +    try {
 +      // hadoop 2.0
 +      safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
 +    } catch (ClassNotFoundException ex) {
 +      // hadoop 1.0
 +      try {
 +        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
 +      } catch (ClassNotFoundException e) {
 +        throw new RuntimeException("Cannot figure out the right class for Constants");
 +      }
 +    }
 +    Object get = null;
 +    for (Object obj : safeModeAction.getEnumConstants()) {
 +      if (obj.toString().equals("SAFEMODE_GET"))
 +        get = obj;
 +    }
 +    if (get == null) {
 +      throw new RuntimeException("cannot find SAFEMODE_GET");
 +    }
 +    try {
 +      Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
 +      return (Boolean) setSafeMode.invoke(dfs, get);
 +    } catch (Exception ex) {
 +      throw new RuntimeException("cannot find method setSafeMode");
 +    }
 +  }
 +}


Mime
View raw message