accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [22/23] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Date Thu, 23 Jan 2014 07:36:40 GMT
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	core/src/main/java/org/apache/accumulo/core/client/ZooKeeperInstance.java
	core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
	fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
	server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
	server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
	server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e7e5c009
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e7e5c009
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e7e5c009

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: e7e5c009737b3249cee667f75b48de3be4adb2fc
Parents: b353a24 c4cd3b1
Author: Sean Busbey <busbey@cloudera.com>
Authored: Wed Jan 22 23:50:23 2014 -0600
Committer: Sean Busbey <busbey@cloudera.com>
Committed: Wed Jan 22 23:50:23 2014 -0600

----------------------------------------------------------------------
 .../apache/accumulo/core/util/AddressUtil.java  |  2 +-
 .../apache/accumulo/core/zookeeper/ZooUtil.java | 10 ++-
 .../apache/accumulo/fate/util/AddressUtil.java  | 60 +++++++++++++
 .../accumulo/fate/zookeeper/ZooSession.java     | 14 +--
 .../accumulo/fate/util/AddressUtilTest.java     | 95 ++++++++++++++++++++
 .../org/apache/accumulo/server/Accumulo.java    | 22 ++++-
 .../accumulo/server/fs/VolumeManagerImpl.java   |  3 +
 .../accumulo/server/util/TabletOperations.java  | 10 ++-
 .../accumulo/master/tableOps/DeleteTable.java   |  8 ++
 .../org/apache/accumulo/tracer/TraceServer.java | 67 ++++++++++----
 .../org/apache/accumulo/tserver/Compactor.java  |  2 +
 11 files changed, 264 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
index 46510ee,af9a1a6..bd9a5ca
--- a/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
@@@ -16,25 -16,36 +16,25 @@@
   */
  package org.apache.accumulo.core.util;
  
 -import java.net.InetSocketAddress;
 -
 -import org.apache.hadoop.io.Text;
 -import org.apache.thrift.transport.TSocket;
 +import com.google.common.net.HostAndPort;
  
- public class AddressUtil {
+ public class AddressUtil extends org.apache.accumulo.fate.util.AddressUtil {
  
 -  static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
 -    String[] parts = address.split(":", 2);
 -    if (address.contains("+"))
 -      parts = address.split("\\+", 2);
 -    if (parts.length == 2) {
 -      if (parts[1].isEmpty())
 -        return new InetSocketAddress(parts[0], defaultPort);
 -      return new InetSocketAddress(parts[0], Integer.parseInt(parts[1]));
 -    }
 -    return new InetSocketAddress(address, defaultPort);
 -  }
 -  
 -  static public InetSocketAddress parseAddress(Text address, int defaultPort) {
 -    return parseAddress(address.toString(), defaultPort);
 +
 +  static public HostAndPort parseAddress(String address) throws NumberFormatException {
 +    return parseAddress(address, false);
    }
 -  
 -  static public TSocket createTSocket(String address, int defaultPort) {
 -    InetSocketAddress addr = parseAddress(address, defaultPort);
 -    return new TSocket(addr.getHostName(), addr.getPort());
 +
 +  static public HostAndPort parseAddress(String address, boolean ignoreMissingPort) throws NumberFormatException {
 +    address = address.replace('+', ':');
 +    HostAndPort hap = HostAndPort.fromString(address);
 +    if (!ignoreMissingPort && !hap.hasPort())
 +      throw new IllegalArgumentException("Address was expected to contain port. address=" + address);
 +    
 +    return hap;
    }
 -  
 -  static public String toString(InetSocketAddress addr) {
 -    return addr.getAddress().getHostAddress() + ":" + addr.getPort();
 +
 +  public static HostAndPort parseAddress(String address, int defaultPort) {
 +    return parseAddress(address, true).withDefaultPort(defaultPort);
    }
 -  
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
index 9206beb,86dc4d2..fa0bdf6
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
@@@ -16,23 -16,10 +16,24 @@@
   */
  package org.apache.accumulo.core.zookeeper;
  
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
++import java.net.UnknownHostException;
 +
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
  
  public class ZooUtil extends org.apache.accumulo.fate.zookeeper.ZooUtil {
 +  
 +  private static final Logger log = Logger.getLogger(ZooUtil.class);
 +  
    public static String getRoot(final Instance instance) {
      return getRoot(instance.getInstanceID());
    }
@@@ -40,35 -27,4 +41,42 @@@
    public static String getRoot(final String instanceId) {
      return Constants.ZROOT + "/" + instanceId;
    }
 +  
 +  /**
 +   * Utility to support certain client side utilities to minimize command-line options.
 +   */
 +
 +  public static String getInstanceIDFromHdfs(Path instanceDirectory) {
 +    try {
 +
 +      @SuppressWarnings("deprecation")
 +      FileSystem fs = FileUtil.getFileSystem(instanceDirectory.toString(), CachedConfiguration.getInstance(), AccumuloConfiguration.getSiteConfiguration());
 +      FileStatus[] files = null;
 +      try {
 +        files = fs.listStatus(instanceDirectory);
 +      } catch (FileNotFoundException ex) {
 +        // ignored
 +      }
 +      log.debug("Trying to read instance id from " + instanceDirectory);
 +      if (files == null || files.length == 0) {
 +        log.error("unable obtain instance id at " + instanceDirectory);
 +        throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory);
 +      } else if (files.length != 1) {
 +        log.error("multiple potential instances in " + instanceDirectory);
 +        throw new RuntimeException("Accumulo found multiple possible instance ids in " + instanceDirectory);
 +      } else {
 +        String result = files[0].getPath().getName();
 +        return result;
 +      }
 +    } catch (IOException e) {
-       throw new RuntimeException("Accumulo not initialized, there is no instance id at " + instanceDirectory, e);
++      log.error("Problem reading instance id out of hdfs at " + instanceDirectory, e);
++      throw new RuntimeException("Can't tell if Accumulo is initialized; can't read instance id at " + instanceDirectory, e);
++    } catch (IllegalArgumentException exception) {
++      /* HDFS throws this when there's a UnknownHostException due to DNS troubles. */
++      if (exception.getCause() instanceof UnknownHostException) {
++        log.error("Problem reading instance id out of hdfs at " + instanceDirectory, exception);
++      }
++      throw exception;
 +    }
 +  }
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
index 83f54b0,0000000..15e157d
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,264 -1,0 +1,282 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.net.InetAddress;
 +import java.net.UnknownHostException;
 +import java.util.Map.Entry;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.trace.DistributedTrace;
++import org.apache.accumulo.core.util.AddressUtil;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.Version;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.LogManager;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.helpers.FileWatchdog;
 +import org.apache.log4j.helpers.LogLog;
 +import org.apache.log4j.xml.DOMConfigurator;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +
 +public class Accumulo {
 +  
 +  private static final Logger log = Logger.getLogger(Accumulo.class);
 +  
 +  public static synchronized void updateAccumuloVersion(VolumeManager fs) {
 +    try {
 +      if (getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
 +        fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.DATA_VERSION));
 +        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION));
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
 +    }
 +  }
 +  
 +  public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
 +    int dataVersion;
 +    try {
 +      FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
 +      if (files == null || files.length == 0) {
 +        dataVersion = -1; // assume it is 0.5 or earlier
 +      } else {
 +        dataVersion = Integer.parseInt(files[0].getPath().getName());
 +      }
 +      return dataVersion;
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
 +    }
 +  }
 +  
 +  public static void enableTracing(String address, String application) {
 +    try {
 +      DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
 +    } catch (Exception ex) {
 +      log.error("creating remote sink for trace spans", ex);
 +    }
 +  }
 +  
 +  private static class LogMonitor extends FileWatchdog implements Watcher {
 +    String path;
 +    
 +    protected LogMonitor(String instance, String filename, int delay) {
 +      super(filename);
 +      setDelay(delay);
 +      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
 +    }
 +    
 +    private void setMonitorPort() {
 +      try {
 +        String port = new String(ZooReaderWriter.getInstance().getData(path, null));
 +        System.setProperty("org.apache.accumulo.core.host.log.port", port);
 +        log.info("Changing monitor log4j port to "+port);
 +        doOnChange();
 +      } catch (Exception e) {
 +        log.error("Error reading zookeeper data for monitor log4j port", e);
 +      }
 +    }
 +    
 +    @Override
 +    public void run() {
 +      try {
 +        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
 +          setMonitorPort();
 +        log.info("Set watch for monitor log4j port");
 +      } catch (Exception e) {
 +        log.error("Unable to set watch for monitor log4j port " + path);
 +      }
 +      super.run();
 +    }
 +    
 +    @Override
 +    protected void doOnChange() {
 +      LogManager.resetConfiguration();
 +      new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
 +    }
 +    
 +    @Override
 +    public void process(WatchedEvent event) {
 +      setMonitorPort();
 +      if (event.getPath() != null) {
 +        try {
 +          ZooReaderWriter.getInstance().exists(event.getPath(), this);
 +        } catch (Exception ex) {
 +          log.error("Unable to reset watch for monitor log4j port", ex);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public static void init(VolumeManager fs, ServerConfiguration config, String application) throws UnknownHostException {
 +    
 +    System.setProperty("org.apache.accumulo.core.application", application);
 +    
 +    if (System.getenv("ACCUMULO_LOG_DIR") != null)
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
 +    
 +    String localhost = InetAddress.getLocalHost().getHostName();
 +    System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
 +    
 +    if (System.getenv("ACCUMULO_LOG_HOST") != null)
 +      System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.host.log", localhost);
 +    
 +    int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
 +    System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
 +    
 +    // Use a specific log config, if it exists
 +    String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
 +    if (!new File(logConfig).exists()) {
 +      // otherwise, use the generic config
 +      logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
 +    }
 +    // Turn off messages about not being able to reach the remote logger... we protect against that.
 +    LogLog.setQuietMode(true);
 +    
 +    // Configure logging
 +    if (logPort==0)
 +      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
 +    else
 +      DOMConfigurator.configureAndWatch(logConfig, 5000);
 +    
 +    // Read the auditing config
 +    String auditConfig = String.format("%s/auditLog.xml", System.getenv("ACCUMULO_CONF_DIR"));
 +    
 +    DOMConfigurator.configureAndWatch(auditConfig, 5000);
 +    
 +    log.info(application + " starting");
 +    log.info("Instance " + config.getInstance().getInstanceID());
 +    int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
 +    log.info("Data Version " + dataVersion);
 +    Accumulo.waitForZookeeperAndHdfs(fs);
 +    
 +    Version codeVersion = new Version(Constants.VERSION);
 +    if (dataVersion != ServerConstants.DATA_VERSION && dataVersion != ServerConstants.PREV_DATA_VERSION) {
 +      throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
 +    }
 +    
 +    TreeMap<String,String> sortedProps = new TreeMap<String,String>();
 +    for (Entry<String,String> entry : config.getConfiguration())
 +      sortedProps.put(entry.getKey(), entry.getValue());
 +    
 +    for (Entry<String,String> entry : sortedProps.entrySet()) {
 +      String key = entry.getKey();
 +      log.info(key + " = " + (Property.isSensitive(key) ? "<hidden>" : entry.getValue()));
 +    }
 +    
 +    monitorSwappiness();
 +  }
 +  
 +  /**
 +   * 
 +   */
 +  public static void monitorSwappiness() {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          String procFile = "/proc/sys/vm/swappiness";
 +          File swappiness = new File(procFile);
 +          if (swappiness.exists() && swappiness.canRead()) {
 +            InputStream is = new FileInputStream(procFile);
 +            try {
 +              byte[] buffer = new byte[10];
 +              int bytes = is.read(buffer);
 +              String setting = new String(buffer, 0, bytes);
 +              setting = setting.trim();
 +              if (bytes > 0 && Integer.parseInt(setting) > 10) {
 +                log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
 +                    + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
 +              }
 +            } finally {
 +              is.close();
 +            }
 +          }
 +        } catch (Throwable t) {
 +          log.error(t, t);
 +        }
 +      }
 +    }, 1000, 10 * 60 * 1000);
 +  }
 +  
 +  public static void waitForZookeeperAndHdfs(VolumeManager fs) {
 +    log.info("Attempting to talk to zookeeper");
 +    while (true) {
 +      try {
 +        ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
 +        break;
 +      } catch (InterruptedException e) {
 +        // ignored
 +      } catch (KeeperException ex) {
 +        log.info("Waiting for accumulo to be initialized");
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
 +    long sleep = 1000;
++    int unknownHostTries = 3;
 +    while (true) {
 +      try {
 +        if (fs.isReady())
 +          break;
 +        log.warn("Waiting for the NameNode to leave safemode");
 +      } catch (IOException ex) {
-         log.warn("Unable to connect to HDFS");
++        log.warn("Unable to connect to HDFS", ex);
++      } catch (IllegalArgumentException exception) {
++        /* Unwrap the UnknownHostException so we can deal with it directly */
++        if (exception.getCause() instanceof UnknownHostException) {
++          if (unknownHostTries > 0) {
++            log.warn("Unable to connect to HDFS, will retry. cause: " + exception.getCause());
++            /* We need to make sure our sleep period is long enough to avoid getting a cached failure of the host lookup. */
++            sleep = Math.max(sleep, (AddressUtil.getAddressCacheNegativeTtl((UnknownHostException)(exception.getCause()))+1)*1000);
++          } else {
++            log.error("Unable to connect to HDFS and have exceeded max number of retries.", exception);
++            throw exception;
++          }
++          unknownHostTries--;
++        } else {
++          throw exception;
++        }
 +      }
-       log.info("Sleeping " + sleep / 1000. + " seconds");
++      log.info("Backing off due to failure; current sleep period is " + sleep / 1000. + " seconds");
 +      UtilWaitThread.sleep(sleep);
++      /* Back off to give transient failures more time to clear. */
 +      sleep = Math.min(60 * 1000, sleep * 2);
 +    }
 +    log.info("Connected to HDFS");
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index eb7a330,0000000..034bc92
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@@ -1,503 -1,0 +1,506 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.fs;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.net.URI;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.file.FileUtil;
 +import org.apache.accumulo.core.util.CachedConfiguration;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.commons.lang.NotImplementedException;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.ContentSummary;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.LocalFileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.fs.Trash;
 +import org.apache.hadoop.fs.permission.FsPermission;
 +import org.apache.hadoop.hdfs.DFSConfigKeys;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.hadoop.util.Progressable;
 +import org.apache.log4j.Logger;
 +
 +public class VolumeManagerImpl implements VolumeManager {
 +
 +  private static final Logger log = Logger.getLogger(VolumeManagerImpl.class);
 +
 +  Map<String,? extends FileSystem> volumes;
 +  String defaultVolume;
 +  AccumuloConfiguration conf;
 +  VolumeChooser chooser;
 +
 +  protected VolumeManagerImpl(Map<String,? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
 +    this.volumes = volumes;
 +    this.defaultVolume = defaultVolume;
 +    this.conf = conf;
 +    ensureSyncIsEnabled();
 +    chooser = Property.createInstanceFromPropertyName(conf, Property.GENERAL_VOLUME_CHOOSER, VolumeChooser.class, new RandomVolumeChooser());
 +  }
 +
 +  public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
 +    return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "",
 +        DefaultConfiguration.getDefaultConfiguration());
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    IOException ex = null;
 +    for (FileSystem fs : volumes.values()) {
 +      try {
 +        fs.close();
 +      } catch (IOException e) {
 +        ex = e;
 +      }
 +    }
 +    if (ex != null) {
 +      throw ex;
 +    }
 +  }
 +
 +  @Override
 +  public boolean closePossiblyOpenFile(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    if (fs instanceof DistributedFileSystem) {
 +      DistributedFileSystem dfs = (DistributedFileSystem) fs;
 +      try {
 +        return dfs.recoverLease(path);
 +      } catch (FileNotFoundException ex) {
 +        throw ex;
 +      }
 +    } else if (fs instanceof LocalFileSystem) {
 +      // ignore
 +    } else {
 +      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
 +    }
 +    fs.append(path).close();
 +    log.info("Recovered lease on " + path.toString() + " using append");
 +    return true;
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.create(path);
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.create(path, overwrite);
 +  }
 +
 +  private static long correctBlockSize(Configuration conf, long blockSize) {
 +    if (blockSize <= 0)
 +      blockSize = conf.getLong("dfs.block.size", 67108864);
 +
 +    int checkSum = conf.getInt("io.bytes.per.checksum", 512);
 +    blockSize -= blockSize % checkSum;
 +    blockSize = Math.max(blockSize, checkSum);
 +    return blockSize;
 +  }
 +
 +  private static int correctBufferSize(Configuration conf, int bufferSize) {
 +    if (bufferSize <= 0)
 +      bufferSize = conf.getInt("io.file.buffer.size", 4096);
 +    return bufferSize;
 +  }
 +
 +  @Override
 +  public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    if (bufferSize == 0) {
 +      fs.getConf().getInt("io.file.buffer.size", 4096);
 +    }
 +    return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
 +  }
 +
 +  @Override
 +  public boolean createNewFile(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    return fs.createNewFile(path);
 +  }
 +
 +  @Override
 +  public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
 +    FileSystem fs = getFileSystemByPath(logPath);
 +    blockSize = correctBlockSize(fs.getConf(), blockSize);
 +    bufferSize = correctBufferSize(fs.getConf(), bufferSize);
 +    try {
 +      // This...
 +      // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
 +      // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
 +      // Becomes this:
 +      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
 +      List<Enum<?>> flags = new ArrayList<Enum<?>>();
 +      if (createFlags.isEnum()) {
 +        for (Object constant : createFlags.getEnumConstants()) {
 +          if (constant.toString().equals("SYNC_BLOCK")) {
 +            flags.add((Enum<?>) constant);
 +            log.debug("Found synch enum " + constant);
 +          }
 +          if (constant.toString().equals("CREATE")) {
 +            flags.add((Enum<?>) constant);
 +            log.debug("Found CREATE enum " + constant);
 +          }
 +        }
 +      }
 +      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
 +      log.debug("CreateFlag set: " + set);
 +      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
 +      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
 +      return (FSDataOutputStream) create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
 +    } catch (ClassNotFoundException ex) {
 +      // Expected in hadoop 1.0
 +      return fs.create(logPath, true, bufferSize, replication, blockSize);
 +    } catch (Exception ex) {
 +      log.debug(ex, ex);
 +      return fs.create(logPath, true, bufferSize, replication, blockSize);
 +    }
 +  }
 +
 +  @Override
 +  public boolean delete(Path path) throws IOException {
 +    return getFileSystemByPath(path).delete(path, false);
 +  }
 +
 +  @Override
 +  public boolean deleteRecursively(Path path) throws IOException {
 +    return getFileSystemByPath(path).delete(path, true);
 +  }
 +
 +  protected void ensureSyncIsEnabled() {
 +    for (Entry<String,? extends FileSystem> entry : getFileSystems().entrySet()) {
 +      final String volumeName = entry.getKey();
 +      final FileSystem fs = entry.getValue();
 +      
 +      if (fs instanceof DistributedFileSystem) {
 +        final String DFS_DURABLE_SYNC = "dfs.durable.sync", DFS_SUPPORT_APPEND = "dfs.support.append";
 +        final String ticketMessage = "See ACCUMULO-623 and ACCUMULO-1637 for more details.";
 +        // Check to make sure that we have proper defaults configured
 +        try {
 +          // If the default is off (0.20.205.x or 1.0.x)
 +          DFSConfigKeys configKeys = new DFSConfigKeys();
 +          
 +          // Can't use the final constant itself as Java will inline it at compile time
 +          Field dfsSupportAppendDefaultField = configKeys.getClass().getField("DFS_SUPPORT_APPEND_DEFAULT");
 +          boolean dfsSupportAppendDefaultValue = dfsSupportAppendDefaultField.getBoolean(configKeys);
 +          
 +          if (!dfsSupportAppendDefaultValue) {
 +            // See if the user did the correct override
 +            if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, false)) {
 +              String msg = "Accumulo requires that dfs.support.append to true. " + ticketMessage;
 +              log.fatal(msg);
 +              throw new RuntimeException(msg);
 +            }
 +          }
 +        } catch (NoSuchFieldException e) {
 +          // If we can't find DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT, the user is running
 +          // 1.1.x or 1.2.x. This is ok, though, as, by default, these versions have append/sync enabled.
 +        } catch (Exception e) {
 +          log.warn("Error while checking for " + DFS_SUPPORT_APPEND + " on volume " + volumeName + ". The user should ensure that Hadoop is configured to properly supports append and sync. " + ticketMessage, e);
 +        }
 +        
 +        // If either of these parameters are configured to be false, fail.
 +        // This is a sign that someone is writing bad configuration.
 +        if (!fs.getConf().getBoolean(DFS_SUPPORT_APPEND, true) || !fs.getConf().getBoolean(DFS_DURABLE_SYNC, true)) {
 +          String msg = "Accumulo requires that " + DFS_SUPPORT_APPEND + " and " + DFS_DURABLE_SYNC + " not be configured as false. " + ticketMessage;
 +          log.fatal(msg);
 +          throw new RuntimeException(msg);
 +        }
 +        
 +        try {
 +          // Check DFSConfigKeys to see if DFS_DATANODE_SYNCONCLOSE_KEY exists (should be everything >=1.1.1 and the 0.23 line)
 +          Class<?> dfsConfigKeysClz = Class.forName("org.apache.hadoop.hdfs.DFSConfigKeys");
 +          dfsConfigKeysClz.getDeclaredField("DFS_DATANODE_SYNCONCLOSE_KEY");
 +        
 +          // Everything else
 +          if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
 +            log.warn("dfs.datanode.synconclose set to false in hdfs-site.xml: data loss is possible on system reset or power loss");
 +          }
 +        } catch (ClassNotFoundException ex) {
 +          // hadoop 1.0.X or hadoop 1.1.0
 +        } catch (SecurityException e) {
 +          // hadoop 1.0.X or hadoop 1.1.0
 +        } catch (NoSuchFieldException e) {
 +          // hadoop 1.0.X or hadoop 1.1.0
 +        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public boolean exists(Path path) throws IOException {
 +    return getFileSystemByPath(path).exists(path);
 +  }
 +
 +  @Override
 +  public FileStatus getFileStatus(Path path) throws IOException {
 +    return getFileSystemByPath(path).getFileStatus(path);
 +  }
 +
 +  @Override
 +  public FileSystem getFileSystemByPath(Path path) {
 +    if (path.toString().contains(":")) {
 +      try {
 +        return path.getFileSystem(CachedConfiguration.getInstance());
 +      } catch (IOException ex) {
 +        throw new RuntimeException(ex);
 +      }
 +    }
 +
 +    return volumes.get(defaultVolume);
 +  }
 +
 +  @Override
 +  public Map<String,? extends FileSystem> getFileSystems() {
 +    return volumes;
 +  }
 +
 +  @Override
 +  public FileStatus[] listStatus(Path path) throws IOException {
 +    return getFileSystemByPath(path).listStatus(path);
 +  }
 +
 +  @Override
 +  public boolean mkdirs(Path path) throws IOException {
 +    return getFileSystemByPath(path).mkdirs(path);
 +  }
 +
 +  @Override
 +  public FSDataInputStream open(Path path) throws IOException {
 +    return getFileSystemByPath(path).open(path);
 +  }
 +
 +  @Override
 +  public boolean rename(Path path, Path newPath) throws IOException {
 +    FileSystem source = getFileSystemByPath(path);
 +    FileSystem dest = getFileSystemByPath(newPath);
 +    if (source != dest) {
 +      throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
 +    }
 +    return source.rename(path, newPath);
 +  }
 +
 +  @Override
 +  public boolean moveToTrash(Path path) throws IOException {
 +    FileSystem fs = getFileSystemByPath(path);
 +    Trash trash = new Trash(fs, fs.getConf());
 +    return trash.moveToTrash(path);
 +  }
 +
 +  @Override
 +  public short getDefaultReplication(Path path) {
 +    FileSystem fs = getFileSystemByPath(path);
 +    try {
 +      // try calling hadoop 2 method
 +      Method method = fs.getClass().getMethod("getDefaultReplication", Path.class);
 +      return ((Short) method.invoke(fs, path)).shortValue();
 +    } catch (NoSuchMethodException e) {
 +      // ignore
 +    } catch (IllegalArgumentException e) {
 +      throw new RuntimeException(e);
 +    } catch (IllegalAccessException e) {
 +      throw new RuntimeException(e);
 +    } catch (InvocationTargetException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    @SuppressWarnings("deprecation")
 +    short rep = fs.getDefaultReplication();
 +    return rep;
 +  }
 +
 +  @Override
 +  public boolean isFile(Path path) throws IOException {
 +    return getFileSystemByPath(path).isFile(path);
 +  }
 +
 +  public static VolumeManager get() throws IOException {
 +    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
 +    return get(conf);
 +  }
 +
 +  static private final String DEFAULT = "";
 +
 +  public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
 +    Map<String,FileSystem> fileSystems = new HashMap<String,FileSystem>();
 +    Configuration hadoopConf = CachedConfiguration.getInstance();
 +    fileSystems.put(DEFAULT, FileUtil.getFileSystem(hadoopConf, conf));
 +    String ns = conf.get(Property.INSTANCE_VOLUMES);
 +    if (ns != null && !ns.isEmpty()) {
 +      for (String space : ns.split(",")) {
 +        if (space.equals(DEFAULT))
 +          throw new IllegalArgumentException();
 +
 +        if (space.contains(":")) {
 +          fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
 +        } else {
 +          fileSystems.put(space, FileSystem.get(hadoopConf));
 +        }
 +      }
 +    }
 +    return new VolumeManagerImpl(fileSystems, DEFAULT, conf);
 +  }
 +
 +  @Override
 +  public boolean isReady() throws IOException {
 +    for (FileSystem fs : getFileSystems().values()) {
 +      if (!(fs instanceof DistributedFileSystem))
 +        continue;
 +      DistributedFileSystem dfs = (DistributedFileSystem) fs;
 +      // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
 +      // Becomes this:
 +      Class<?> safeModeAction;
 +      try {
 +        // hadoop 2.0
 +        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
 +      } catch (ClassNotFoundException ex) {
 +        // hadoop 1.0
 +        try {
 +          safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
 +        } catch (ClassNotFoundException e) {
 +          throw new RuntimeException("Cannot figure out the right class for Constants");
 +        }
 +      }
 +      Object get = null;
 +      for (Object obj : safeModeAction.getEnumConstants()) {
 +        if (obj.toString().equals("SAFEMODE_GET"))
 +          get = obj;
 +      }
 +      if (get == null) {
 +        throw new RuntimeException("cannot find SAFEMODE_GET");
 +      }
 +      try {
 +        Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
 +        boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
 +        if (inSafeMode) {
 +          return false;
 +        }
++      } catch (IllegalArgumentException exception) {
++        /* Send IAEs back as-is, so that those that wrap UnknownHostException can be handled in the same place as similar sources of failure. */
++        throw exception;
 +      } catch (Exception ex) {
 +        throw new RuntimeException("cannot find method setSafeMode");
 +      }
 +    }
 +    return true;
 +  }
 +
 +  @Override
 +  public FileSystem getDefaultVolume() {
 +    return volumes.get(defaultVolume);
 +  }
 +
 +  @Override
 +  public FileStatus[] globStatus(Path pathPattern) throws IOException {
 +    return getFileSystemByPath(pathPattern).globStatus(pathPattern);
 +  }
 +
 +  @Override
 +  public Path getFullPath(Key key) {
 +    // TODO sanity check col fam
 +    String relPath = key.getColumnQualifierData().toString();
 +    byte[] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
 +    return getFullPath(new String(tableId), relPath);
 +  }
 +
 +  @Override
 +  public Path matchingFileSystem(Path source, String[] options) {
 +    try {
 +      if (ViewFSUtils.isViewFS(source, CachedConfiguration.getInstance())) {
 +        return ViewFSUtils.matchingFileSystem(source, options, CachedConfiguration.getInstance());
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    URI uri1 = source.toUri();
 +    for (String option : options) {
 +      URI uri3 = URI.create(option);
 +      if (uri1.getScheme().equals(uri3.getScheme())) {
 +        String a1 = uri1.getAuthority();
 +        String a2 = uri3.getAuthority();
 +        if ((a1 == null && a2 == null) || (a1 != null && a1.equals(a2)))
 +          return new Path(option);
 +      }
 +    }
 +    return null;
 +  }
 +
 +  @Override
 +  public Path getFullPath(String tableId, String path) {
 +    if (path.contains(":"))
 +      return new Path(path);
 +    
 +    if (path.startsWith("../"))
 +      path = path.substring(2);
 +    else if (path.startsWith("/"))
 +      path = "/" + tableId + path;
 +    else
 +      throw new IllegalArgumentException("Unexpected path prefix " + path);
 +    
 +    return getFullPath(FileType.TABLE, path);
 +  }
 +  
 +  @Override
 +  public Path getFullPath(FileType fileType, String path) {
 +    if (path.contains(":"))
 +      return new Path(path);
 +    
 +    // normalize the path
 +    Path fullPath = new Path(ServerConstants.getDefaultBaseDir(), fileType.getDirectory());
 +    if (path.startsWith("/"))
 +      path = path.substring(1);
 +    fullPath = new Path(fullPath, path);
 +    
 +    FileSystem fs = getFileSystemByPath(fullPath);
 +    return fs.makeQualified(fullPath);
 +  }
 +
 +  @Override
 +  public ContentSummary getContentSummary(Path dir) throws IOException {
 +    return getFileSystemByPath(dir).getContentSummary(dir);
 +  }
 +
 +  @Override
 +  public String choose(String[] options) {
 +    return chooser.choose(options);
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
index 14cf37b,0000000..b237cd0
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
@@@ -1,83 -1,0 +1,91 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server.util;
 +
 +import java.io.IOException;
++import java.net.UnknownHostException;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +public class TabletOperations {
 +  
 +  private static final Logger log = Logger.getLogger(TabletOperations.class);
 +  
 +  public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
 +    String lowDirectory;
 +    
 +    UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
 +    String volume = fs.choose(ServerConstants.getTablesDirs());
 +    
 +    while (true) {
 +      try {
 +        if (endRow == null) {
 +          lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
 +          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
 +          if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath)) {
 +            FileSystem pathFs = fs.getFileSystemByPath(lowDirectoryPath);
 +            return lowDirectoryPath.makeQualified(pathFs.getUri(), pathFs.getWorkingDirectory()).toString();
 +          }
 +          log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
 +        } else {
 +          lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
 +          Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" +  lowDirectory);
 +          if (fs.exists(lowDirectoryPath))
 +            throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
 +          if (fs.mkdirs(lowDirectoryPath)) {
 +            FileSystem lowDirectoryFs = fs.getFileSystemByPath(lowDirectoryPath);
 +            return lowDirectoryPath.makeQualified(lowDirectoryFs.getUri(), lowDirectoryFs.getWorkingDirectory()).toString();
 +          }
 +        }
 +      } catch (IOException e) {
 +        log.warn(e);
 +      }
 +      
 +      log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
 +      UtilWaitThread.sleep(3000);
 +      
 +    }
 +  }
 +  
 +  public static String createTabletDirectory(String tableDir, Text endRow) {
 +    while (true) {
 +      try {
 +        VolumeManager fs = VolumeManagerImpl.get();
 +        return createTabletDirectory(fs, tableDir, endRow);
 +      } catch (IOException e) {
-         log.warn(e);
++        log.warn("problem creating tablet directory", e);
++      } catch (IllegalArgumentException exception) {
++        /* thrown in some edge cases of DNS failure by Hadoop instead of UnknownHostException */
++        if (exception.getCause() instanceof UnknownHostException) {
++          log.warn("problem creating tablet directory", exception);
++        } else {
++          throw exception;
++        }
 +      }
 +      UtilWaitThread.sleep(3000);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
----------------------------------------------------------------------
diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
index aaeaac5,0000000..166ec89
mode 100644,000000..100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/DeleteTable.java
@@@ -1,251 -1,0 +1,259 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
++import java.net.UnknownHostException;
 +import java.util.Map.Entry;
 +
 +import org.apache.accumulo.core.client.BatchScanner;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.GrepIterator;
 +import org.apache.accumulo.core.master.state.tables.TableState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 +import org.apache.accumulo.server.master.state.TabletLocationState;
 +import org.apache.accumulo.server.master.state.TabletState;
 +import org.apache.accumulo.server.problems.ProblemReports;
 +import org.apache.accumulo.server.security.AuditedSecurityOperation;
 +import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.tables.TableManager;
 +import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +
 +class CleanUp extends MasterRepo {
 +  
 +  final private static Logger log = Logger.getLogger(CleanUp.class);
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private String tableId, namespaceId;
 +  
 +  private long creationTime;
 +  
 +  private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
 +    in.defaultReadObject();
 +    
 +    /*
 +     * handle the case where we start executing on a new machine where the current time is in the past relative to the previous machine
 +     * 
 +     * if the new machine has time in the future, that will work ok w/ hasCycled
 +     */
 +    if (System.currentTimeMillis() < creationTime) {
 +      creationTime = System.currentTimeMillis();
 +    }
 +    
 +  }
 +  
 +  public CleanUp(String tableId, String namespaceId) {
 +    this.tableId = tableId;
 +    this.namespaceId = namespaceId;
 +    creationTime = System.currentTimeMillis();
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (!master.hasCycled(creationTime)) {
 +      return 50;
 +    }
 +    
 +    boolean done = true;
 +    Range tableRange = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
 +    Scanner scanner = master.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    MetaDataTableScanner.configureScanner(scanner, master);
 +    scanner.setRange(tableRange);
 +    
 +    KeyExtent prevExtent = null;
 +    for (Entry<Key,Value> entry : scanner) {
 +      TabletLocationState locationState = MetaDataTableScanner.createTabletLocationState(entry.getKey(), entry.getValue());
 +      if (!locationState.extent.isPreviousExtent(prevExtent)) {
 +        log.debug("Still waiting for table to be deleted: " + tableId + " saw inconsistency" + prevExtent + " " + locationState.extent);
 +        done = false;
 +        break;
 +      }
 +      prevExtent = locationState.extent;
 +      
 +      TabletState state = locationState.getState(master.onlineTabletServers());
 +      if (state.equals(TabletState.ASSIGNED) || state.equals(TabletState.HOSTED)) {
 +        log.debug("Still waiting for table to be deleted: " + tableId + " locationState: " + locationState);
 +        done = false;
 +        break;
 +      }
 +    }
 +    
 +    if (!done)
 +      return 50;
 +    
 +    return 0;
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master master) throws Exception {
 +    
 +    master.clearMigrations(tableId);
 +    
 +    int refCount = 0;
 +    
 +    try {
 +      // look for other tables that references this table's files
 +      Connector conn = master.getConnector();
 +      BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 8);
 +      try {
 +        Range allTables = MetadataSchema.TabletsSection.getRange();
 +        Range tableRange = MetadataSchema.TabletsSection.getRange(tableId);
 +        Range beforeTable = new Range(allTables.getStartKey(), true, tableRange.getStartKey(), false);
 +        Range afterTable = new Range(tableRange.getEndKey(), false, allTables.getEndKey(), true);
 +        bs.setRanges(Arrays.asList(beforeTable, afterTable));
 +        bs.fetchColumnFamily(DataFileColumnFamily.NAME);
 +        IteratorSetting cfg = new IteratorSetting(40, "grep", GrepIterator.class);
 +        GrepIterator.setTerm(cfg, "/" + tableId + "/");
 +        bs.addScanIterator(cfg);
 +        
 +        for (Entry<Key,Value> entry : bs) {
 +          if (entry.getKey().getColumnQualifier().toString().contains("/" + tableId + "/")) {
 +            refCount++;
 +          }
 +        }
 +      } finally {
 +        bs.close();
 +      }
 +      
 +    } catch (Exception e) {
 +      refCount = -1;
 +      log.error("Failed to scan " + MetadataTable.NAME + " looking for references to deleted table " + tableId, e);
 +    }
 +    
 +    // remove metadata table entries
 +    try {
 +      // Intentionally do not pass master lock. If master loses lock, this operation may complete before master can kill itself.
 +      // If the master lock passed to deleteTable, it is possible that the delete mutations will be dropped. If the delete operations
 +      // are dropped and the operation completes, then the deletes will not be repeated.
 +      MetadataTableUtil.deleteTable(tableId, refCount != 0, SystemCredentials.get(), null);
 +    } catch (Exception e) {
 +      log.error("error deleting " + tableId + " from metadata table", e);
 +    }
 +    
 +    // remove any problem reports the table may have
 +    try {
 +      ProblemReports.getInstance().deleteProblemReports(tableId);
 +    } catch (Exception e) {
 +      log.error("Failed to delete problem reports for table " + tableId, e);
 +    }
 +    
 +    if (refCount == 0) {
 +      // delete the map files
 +      try {
 +        VolumeManager fs = master.getFileSystem();
 +        for (String dir : ServerConstants.getTablesDirs()) {
 +          fs.deleteRecursively(new Path(dir, tableId));
 +        }
 +      } catch (IOException e) {
 +        log.error("Unable to remove deleted table directory", e);
++      } catch (IllegalArgumentException exception) {
++        if (exception.getCause() instanceof UnknownHostException) {
++          /* Thrown if HDFS encounters a DNS problem in some edge cases */
++          log.error("Unable to remove deleted table directory", exception);
++        } else {
++          throw exception;
++        }
 +      }
 +    }
 +    
 +    // remove table from zookeeper
 +    try {
 +      TableManager.getInstance().removeTable(tableId);
 +      Tables.clearCache(master.getInstance());
 +    } catch (Exception e) {
 +      log.error("Failed to find table id in zookeeper", e);
 +    }
 +    
 +    // remove any permissions associated with this table
 +    try {
 +      AuditedSecurityOperation.getInstance().deleteTable(SystemCredentials.get().toThrift(master.getInstance()), tableId);
 +    } catch (ThriftSecurityException e) {
 +      log.error(e.getMessage(), e);
 +    }
 +    
 +    Utils.unreserveTable(tableId, tid, true);
 +    Utils.unreserveNamespace(namespaceId, tid, false);
 +    
 +    Logger.getLogger(CleanUp.class).debug("Deleted table " + tableId);
 +    
 +    return null;
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    // nothing to do
 +  }
 +  
 +}
 +
 +public class DeleteTable extends MasterRepo {
 +  
 +  private static final long serialVersionUID = 1L;
 +  
 +  private String tableId, namespaceId;
 +  
 +  public DeleteTable(String tableId) {
 +    this.tableId = tableId;
 +    Instance inst = HdfsZooInstance.getInstance();
 +    this.namespaceId = Tables.getNamespace(inst, tableId);
 +  }
 +  
 +  @Override
 +  public long isReady(long tid, Master environment) throws Exception {
 +    
 +    return Utils.reserveNamespace(namespaceId, tid, false, false, TableOperation.DELETE)
 +        + Utils.reserveTable(tableId, tid, true, true, TableOperation.DELETE);
 +  }
 +  
 +  @Override
 +  public Repo<Master> call(long tid, Master environment) throws Exception {
 +    TableManager.getInstance().transitionTableState(tableId, TableState.DELETING);
 +    environment.getEventCoordinator().event("deleting table %s ", tableId);
 +    return new CleanUp(tableId, namespaceId);
 +  }
 +  
 +  @Override
 +  public void undo(long tid, Master environment) throws Exception {
 +    Utils.unreserveNamespace(namespaceId, tid, false);
 +    Utils.unreserveTable(tableId, tid, true);
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e7e5c009/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
----------------------------------------------------------------------
diff --cc server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index d513ebc,0000000..32898f4
mode 100644,000000..100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@@ -1,291 -1,0 +1,322 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tracer;
 +
 +import java.net.InetSocketAddress;
 +import java.net.ServerSocket;
 +import java.nio.channels.ServerSocketChannel;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.atomic.AtomicReference;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken.Properties;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 +import org.apache.accumulo.core.security.SecurityUtil;
 +import org.apache.accumulo.core.trace.TraceFormatter;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.server.Accumulo;
 +import org.apache.accumulo.server.ServerOpts;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.start.classloader.AccumuloClassLoader;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.thrift.RemoteSpan;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Iface;
 +import org.apache.accumulo.trace.thrift.SpanReceiver.Processor;
 +import org.apache.hadoop.io.Text;
 +import org.apache.log4j.Logger;
 +import org.apache.thrift.TByteArrayOutputStream;
 +import org.apache.thrift.TException;
 +import org.apache.thrift.protocol.TCompactProtocol;
 +import org.apache.thrift.server.TServer;
 +import org.apache.thrift.server.TThreadPoolServer;
 +import org.apache.thrift.transport.TServerSocket;
 +import org.apache.thrift.transport.TServerTransport;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.Watcher.Event.EventType;
 +import org.apache.zookeeper.Watcher.Event.KeeperState;
 +
 +public class TraceServer implements Watcher {
 +  
 +  final private static Logger log = Logger.getLogger(TraceServer.class);
 +  final private ServerConfiguration serverConfiguration;
 +  final private TServer server;
-   private BatchWriter writer = null;
-   private Connector connector;
++  final private AtomicReference<BatchWriter> writer;
++  final private Connector connector;
 +  final String table;
 +  
 +  private static void put(Mutation m, String cf, String cq, byte[] bytes, int len) {
 +    m.put(new Text(cf), new Text(cq), new Value(bytes, 0, len));
 +  }
 +  
 +  static class ByteArrayTransport extends TTransport {
 +    TByteArrayOutputStream out = new TByteArrayOutputStream();
 +    
 +    @Override
 +    public boolean isOpen() {
 +      return true;
 +    }
 +    
 +    @Override
 +    public void open() throws TTransportException {}
 +    
 +    @Override
 +    public void close() {}
 +    
 +    @Override
 +    public int read(byte[] buf, int off, int len) {
 +      return 0;
 +    }
 +    
 +    @Override
 +    public void write(byte[] buf, int off, int len) throws TTransportException {
 +      out.write(buf, off, len);
 +    }
 +    
 +    public byte[] get() {
 +      return out.get();
 +    }
 +    
 +    public int len() {
 +      return out.len();
 +    }
 +  }
 +  
 +  class Receiver implements Iface {
 +    @Override
 +    public void span(RemoteSpan s) throws TException {
 +      String idString = Long.toHexString(s.traceId);
 +      String startString = Long.toHexString(s.start);
 +      Mutation spanMutation = new Mutation(new Text(idString));
 +      Mutation indexMutation = new Mutation(new Text("idx:" + s.svc + ":" + startString));
 +      long diff = s.stop - s.start;
 +      indexMutation.put(new Text(s.description), new Text(s.sender), new Value((idString + ":" + Long.toHexString(diff)).getBytes()));
 +      ByteArrayTransport transport = new ByteArrayTransport();
 +      TCompactProtocol protocol = new TCompactProtocol(transport);
 +      s.write(protocol);
 +      String parentString = Long.toHexString(s.parentId);
 +      if (s.parentId == Span.ROOT_SPAN_ID)
 +        parentString = "";
 +      put(spanMutation, "span", parentString + ":" + Long.toHexString(s.spanId), transport.get(), transport.len());
 +      // Map the root span to time so we can look up traces by time
 +      Mutation timeMutation = null;
 +      if (s.parentId == Span.ROOT_SPAN_ID) {
 +        timeMutation = new Mutation(new Text("start:" + startString));
 +        put(timeMutation, "id", idString, transport.get(), transport.len());
 +      }
 +      try {
-         if (writer == null)
-           resetWriter();
-         if (writer == null)
++        final BatchWriter writer = TraceServer.this.writer.get();
++        /* Check for null, because we expect spans to come in much faster than flush calls.
++           In the case of failure, we'd rather avoid logging tons of NPEs.
++         */
++        if (null == writer) {
++          log.warn("writer is not ready; discarding span.");
 +          return;
++        }
 +        writer.addMutation(spanMutation);
 +        writer.addMutation(indexMutation);
 +        if (timeMutation != null)
 +          writer.addMutation(timeMutation);
-       } catch (Exception ex) {
-         log.error("Unable to write mutation to table: " + spanMutation, ex);
++      } catch (MutationsRejectedException exception) {
++        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + exception);
++        if (log.isDebugEnabled()) {
++          log.debug("discarded span due to rejection of mutation: " + spanMutation, exception);
++        }
++      /* XXX this could be e.g. an IllegalArgumentExceptoion if we're trying to write this mutation to a writer that has been closed since we retrieved it */
++      } catch (RuntimeException exception) {
++        log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + exception);
++        log.debug("unable to write mutation to table due to exception.", exception);
 +      }
 +    }
 +    
 +  }
 +  
 +  public TraceServer(ServerConfiguration serverConfiguration, String hostname) throws Exception {
 +    this.serverConfiguration = serverConfiguration;
 +    AccumuloConfiguration conf = serverConfiguration.getConfiguration();
 +    table = conf.get(Property.TRACE_TABLE);
++    Connector connector = null;
 +    while (true) {
 +      try {
 +        String principal = conf.get(Property.TRACE_USER);
 +        AuthenticationToken at;
 +        Map<String,String> loginMap = conf.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
 +        if (loginMap.isEmpty()) {
 +          Property p = Property.TRACE_PASSWORD;
 +          at = new PasswordToken(conf.get(p).getBytes());
 +        } else {
 +          Properties props = new Properties();
 +          AuthenticationToken token = AccumuloClassLoader.getClassLoader().loadClass(conf.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class)
 +              .newInstance();
 +
 +          int prefixLength = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length() + 1;
 +          for (Entry<String,String> entry : loginMap.entrySet()) {
 +            props.put(entry.getKey().substring(prefixLength), entry.getValue());
 +          }
 +
 +          token.init(props);
 +          
 +          at = token;
 +        }
 +        
 +        connector = serverConfiguration.getInstance().getConnector(principal, at);
 +        if (!connector.tableOperations().exists(table)) {
 +          connector.tableOperations().create(table);
 +          IteratorSetting setting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
 +          AgeOffFilter.setTTL(setting, 7 * 24 * 60 * 60 * 1000l);
 +          connector.tableOperations().attachIterator(table, setting);
 +        }
 +        connector.tableOperations().setProperty(table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
 +        break;
 +      } catch (Exception ex) {
 +        log.info("Waiting to checking/create the trace table.", ex);
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
++    this.connector = connector;
++    // make sure we refer to the final variable from now on.
++    connector = null;
 +    
 +    int port = conf.getPort(Property.TRACE_PORT);
 +    final ServerSocket sock = ServerSocketChannel.open().socket();
 +    sock.setReuseAddress(true);
 +    sock.bind(new InetSocketAddress(hostname, port));
 +    final TServerTransport transport = new TServerSocket(sock);
 +    TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
 +    options.processor(new Processor<Iface>(new Receiver()));
 +    server = new TThreadPoolServer(options);
 +    registerInZooKeeper(sock.getInetAddress().getHostAddress() + ":" + sock.getLocalPort());
-     writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
++    writer = new AtomicReference<BatchWriter>(this.connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS)));
 +  }
 +  
 +  public void run() throws Exception {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        flush();
 +      }
 +    }, 1000, 1000);
 +    server.serve();
 +  }
 +  
 +  private void flush() {
 +    try {
-       writer.flush();
-     } catch (Exception e) {
-       log.error("Error flushing traces", e);
++      final BatchWriter writer = this.writer.get();
++      if (null != writer) {
++        writer.flush();
++      }
++    } catch (MutationsRejectedException exception) {
++      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++      log.debug("flushing traces failed due to exception", exception);
++      resetWriter();
++    /* XXX e.g. if the writer was closed between when we grabbed it and when we called flush. */
++    } catch (RuntimeException exception) {
++      log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + exception);
++      log.debug("flushing traces failed due to exception", exception);
 +      resetWriter();
 +    }
 +  }
 +  
-   synchronized private void resetWriter() {
++  private void resetWriter() {
++    BatchWriter writer = null;
 +    try {
-       if (writer != null)
-         writer.close();
++      writer = connector.createBatchWriter(table, new BatchWriterConfig().setMaxLatency(5, TimeUnit.SECONDS));
 +    } catch (Exception ex) {
-       log.error("Error closing batch writer", ex);
++      log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + ex);
++      log.debug("batch writer creation failed with exception.", ex);
 +    } finally {
-       writer = null;
++      /* Trade in the new writer (even if null) for the one we need to close. */
++      writer = this.writer.getAndSet(writer);
 +      try {
-         writer = connector.createBatchWriter(table, new BatchWriterConfig());
++        if (null != writer) {
++          writer.close();
++        }
 +      } catch (Exception ex) {
-         log.error("Unable to create a batch writer: " + ex);
++        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + ex);
++        log.debug("batch writer close failed with exception", ex);
 +      }
 +    }
 +  }
 +  
 +  private void registerInZooKeeper(String name) throws Exception {
 +    String root = ZooUtil.getRoot(serverConfiguration.getInstance()) + Constants.ZTRACERS;
 +    IZooReaderWriter zoo = ZooReaderWriter.getInstance();
 +    String path = zoo.putEphemeralSequential(root + "/trace-", name.getBytes());
 +    zoo.exists(path, this);
 +  }
 +  
 +  public static void main(String[] args) throws Exception {
 +    SecurityUtil.serverLogin();
 +    ServerOpts opts = new ServerOpts();
 +    opts.parseArgs("tracer", args);
 +    Instance instance = HdfsZooInstance.getInstance();
 +    ServerConfiguration conf = new ServerConfiguration(instance);
 +    VolumeManager fs = VolumeManagerImpl.get();
 +    Accumulo.init(fs, conf, "tracer");
 +    String hostname = opts.getAddress();
 +    TraceServer server = new TraceServer(conf, hostname);
 +    Accumulo.enableTracing(hostname, "tserver");
 +    server.run();
 +    log.info("tracer stopping");
 +  }
 +  
 +  @Override
 +  public void process(WatchedEvent event) {
 +    log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
 +    if (event.getState() == KeeperState.Expired) {
 +      log.warn("Trace server lost zookeeper registration at " + event.getPath());
 +      server.stop();
 +    } else if (event.getType() == EventType.NodeDeleted) {
 +      log.warn("Trace server zookeeper entry lost " + event.getPath());
 +      server.stop();
 +    }
 +    if (event.getPath() != null) {
 +      try {
 +        if (ZooReaderWriter.getInstance().exists(event.getPath(), this))
 +          return;
 +      } catch (Exception ex) {
 +        log.error(ex, ex);
 +      }
 +      log.warn("Trace server unable to reset watch on zookeeper registration");
 +      server.stop();
 +    }
 +  }
 +  
 +}


Mime
View raw message